3.Spark 的 RDD 编程 02

教程 DER ⋅ 于 2023-04-15 17:42:56 ⋅ 1665 阅读

Spark的RDD编程02

9.2.1.2 键值对RDD操作

键值对RDD(pair RDD)是指每个RDD元素都是(key, value)键值对类型;

函数 目的
reduceByKey(func) 合并具有相同键的值,RDD[(K,V)] => RDD[(K,V)]按照key进行分组,并通过func 进行合并计算
groupByKey() 对具有相同键的值进行分组,RDD[(K,V)] => RDD[(K, Iterable)]只按照key进行分组,不对value合并计算
mapValues(func) 对 PairRDD中的每个值应用一个函数,但不改变键不会对值进行合并计算
flatMapValues(func) 对PairRDD 中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录
keys() 返回一个仅包含键的 RDD,RDD[(K,V)] => RDD[K]返回键不去重
values() 返回一个仅包含值的 RDD,RDD[(K,V)] => RDD[V]
sortByKey() 返回一个根据键排序的 RDD,默认是升序false:降序
Intersection union subtract
subtractByKey(other) 删掉RDD中键与other RDD中的键相同的元素
cogroup 将两个RDD中拥有相同键的数据分组到一起,RDD[(K,V)],RDD[(K, W)] => RDD[(K, (Iterable,Iterable))]
join(other) 对两个RDD进行内连接,RDD[(K,V)],RDD[(K, W)] => RDD[(K, (V, W))]相当于MySQL 的 innerjoin
rightOuterJoin 对两个RDD进行右连接,RDD[(K,V)],RDD[(K, W)] => RDD[(K, (Option[V], W))]相当于MySQL 的 rightjoin
leftOuterJoin 对两个RDD进行左连接,RDD[(K,V)],RDD[(K, W)] => RDD[(K, (V, Option[W]))]相当于MySQL 的 leftjoin

groupBykey的代码

它和groupBy相比返回值 RDD[k,Iterable[v]]

因为它也是shuffle类算子可以修改分区数量

scala> sc.textFile("/a.txt")
res51: org.apache.spark.rdd.RDD[String] = /a.txt MapPartitionsRDD[42] at textFile at <console>:25

scala> res51.flatMap(_.split(" "))
res52: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[43] at flatMap at <console>:26

scala> res52.map((_,1))
res53: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[44] at map at <console>:26

scala> res53.groupBy(_._1)
res54: org.apache.spark.rdd.RDD[(String, Iterable[(String, Int)])] = ShuffledRDD[46] at groupBy at <console>:26

scala> res53.groupByKey()
res55: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[47] at groupByKey at <console>:26

scala> res55.mapValues(_.sum)
res56: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[48] at mapValues at <console>:26

scala> res56.collect
res57: Array[(String, Int)] = Array((tom,8), (hello,16), (world,8))             

scala> val arr = Array(("love",("taitannike",9.9)),("love",("cndqsjlg",8.5)),())
arr: Array[Any] = Array((love,(taitannike,9.9)), (love,(cndqsjlg,8.5)), ())

scala> val arr = Array(("love",("taitannike",9.9)),("love",("cndqsjlg",8.5)),("honor",("zh1",9.5)),("honor",("zh2",9.3)))
arr: Array[(String, (String, Double))] = Array((love,(taitannike,9.9)), (love,(cndqsjlg,8.5)), (honor,(zh1,9.5)), (honor,(zh2,9.3)))

scala> sc.makeRDD(arr)
res58: org.apache.spark.rdd.RDD[(String, (String, Double))] = ParallelCollectionRDD[49] at makeRDD at <console>:27

scala> res58.groupByKey()
res59: org.apache.spark.rdd.RDD[(String, Iterable[(String, Double)])] = ShuffledRDD[50] at groupByKey at <console>:26

scala> res59.collect
res60: Array[(String, Iterable[(String, Double)])] = Array((love,CompactBuffer((taitannike,9.9), (cndqsjlg,8.5))), (honor,CompactBuffer((zh1,9.5), (zh2,9.3))))

scala> res58.partitions.size
res61: Int = 9

scala> res58.groupByKey(4)
res62: org.apache.spark.rdd.RDD[(String, Iterable[(String, Double)])] = ShuffledRDD[51] at groupByKey at <console>:26

scala> res62.partitions.size
res63: Int = 4

如果用groupBykey实现distinct去重

scala> arr
res66: Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6)

scala> sc.makeRDD(arr)
res67: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[53] at makeRDD at <console>:27

scala> res67.map((_,null))
res68: org.apache.spark.rdd.RDD[(Int, Null)] = MapPartitionsRDD[54] at map at <console>:26

scala> res68.groupByKey()
res69: org.apache.spark.rdd.RDD[(Int, Iterable[Null])] = ShuffledRDD[55] at groupByKey at <console>:26

scala> res69.map(_._1)
res71: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[56] at map at <console>:26

scala> res71.collect
res72: Array[Int] = Array(1, 2, 3, 4, 5, 6)

reduceBykey

按照key进行合并,不仅仅能够分组还能够合并

scala> val rdd = sc.textFile("/a.txt")
rdd: org.apache.spark.rdd.RDD[String] = /a.txt MapPartitionsRDD[58] at textFile at <console>:24

scala> rdd.flatMap(_.split(" "))
res73: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[59] at flatMap at <console>:26

scala> res73.map((_,1))
res74: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[60] at map at <console>:26

scala> res74.reduceByKey(_+_)
res75: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[61] at reduceByKey at <console>:26

scala> res75.collect
res76: Array[(String, Int)] = Array((tom,8), (hello,16), (world,8))

scala> res74.partitions.size
res77: Int = 2

scala> res75.partitions.size
res78: Int = 2

scala> res74.reduceByKey(_+_,5)
res79: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[62] at reduceByKey at <console>:26

scala> res79.partitions.size
res80: Int = 5

reduceBykey实现distinct的去重

scala> arr
res81: Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6)

scala> sc.makeRDD(arr)
res82: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at makeRDD at <console>:27

scala> res82.map((_,null))
res83: org.apache.spark.rdd.RDD[(Int, Null)] = MapPartitionsRDD[64] at map at <console>:26

scala> res83.reduceByKey((a,b)=> a)
res84: org.apache.spark.rdd.RDD[(Int, Null)] = ShuffledRDD[65] at reduceByKey at <console>:26

scala> res84.collect
res85: Array[(Int, Null)] = Array((1,null), (2,null), (3,null), (4,null), (5,null), (6,null))

scala> res84.map(_._1)
res86: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[66] at map at <console>:26

scala> res86.collect
res87: Array[Int] = Array(1, 2, 3, 4, 5, 6)

distinct的源码实现方式和案例是一样的

file

groupByKey() vs reduceByKey()

file

公共调用的底层方法

file

groupByKey()

file

reduceByKey(func)

file

WordCount reduceByKey和groupByKey.map

scala> sc.textFile("/a.txt")
res0: org.apache.spark.rdd.RDD[String] = /a.txt MapPartitionsRDD[1] at textFile at <console>:25

scala> res0.flatMap(_.split(" "))
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26

scala> res1.map((_,1))
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:26

scala> res2.groupByKey()
res3: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[4] at groupByKey at <console>:26

scala> res3.mapValues(_.sum)
res4: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at mapValues at <console>:26

scala> res4.collect
res5: Array[(String, Int)] = Array((tom,8), (hello,16), (world,8))     

file

scala> sc.textFile("/a.txt")
res0: org.apache.spark.rdd.RDD[String] = /a.txt MapPartitionsRDD[1] at textFile at <console>:25

scala> res0.flatMap(_.split(" "))
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26

scala> res1.map((_,1))
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:26

scala> res2.reduceByKey(_+_)
res6: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:26

scala> res6.collect
res7: Array[(String, Int)] = Array((tom,8), (hello,16), (world,8))

file

file

课堂练习

以如下的数据为案例

# 首先在data文件夹下面创建一个teacher.txt
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/unclewang
http://spark.hainiubl.com/xiaohe
http://spark.hainiubl.com/xiaohe
http://spark.hainiubl.com/xiaohe
http://spark.hainiubl.com/xiaohe
http://spark.hainiubl.com/xiaohe
http://spark.hainiubl.com/xiaohe
http://spark.hainiubl.com/xiaohe
http://spark.hainiubl.com/xiaohe
http://spark.hainiubl.com/xiaohe
http://spark.hainiubl.com/laoyang
http://spark.hainiubl.com/laoyang
http://spark.hainiubl.com/laoyang
http://spark.hainiubl.com/laoyang
http://spark.hainiubl.com/laoyang
http://spark.hainiubl.com/laoyang
http://spark.hainiubl.com/laoyang
http://spark.hainiubl.com/laoyang
http://spark.hainiubl.com/laoyang
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laochen
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laoliu
http://java.hainiubl.com/laozhang
http://java.hainiubl.com/laozhang
http://java.hainiubl.com/laozhang
http://java.hainiubl.com/laozhang
http://java.hainiubl.com/laozhang
http://java.hainiubl.com/laozhang
http://java.hainiubl.com/laozhang
http://java.hainiubl.com/laozhang
http://java.hainiubl.com/laozhang
http://java.hainiubl.com/laozhang
http://java.hainiubl.com/laozhang
http://java.hainiubl.com/laozhang
http://java.hainiubl.com/laozhang

需求

# 首先求出整个文档中访问量最高的前三个
# 每个专业中访问量最高的前两个

整体top3

package com.hainiu.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.net.URL
// http://java.hainiubl.com/laochen
object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("teacher")
    val sc = new SparkContext(conf)
    // all top3
    // group top2
    val teacherVisit:RDD[((String,String),Int)] = sc.textFile("data/teacher.txt")
      .map(t=>{
        val url = new URL(t)
        val subject = url.getHost.split("\\.")(0)
        val teacher = url.getPath.substring(1)
        ((subject,teacher),1)
      }).reduceByKey(_+_)
    teacherVisit.sortBy(-_._2)
      .take(3)
      .foreach(println)
  }
}

分组top2

package com.hainiu.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.net.URL
// http://java.hainiubl.com/laochen
object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("teacher")
    val sc = new SparkContext(conf)
    // all top3
    // group top2
    val teacherVisit:RDD[((String,String),Int)] = sc.textFile("data/teacher.txt")
      .map(t=>{
        val url = new URL(t)
        val subject = url.getHost.split("\\.")(0)
        val teacher = url.getPath.substring(1)
        ((subject,teacher),1)
      }).reduceByKey(_+_)
    val groupData:RDD[(String,Iterable[(String,Int)])] =
      teacherVisit.map(t=>(t._1._1,(t._1._2,t._2))).groupByKey()

    groupData.mapValues(t=>{
      t.toList.sortBy(-_._2).take(2)
    }).foreach(println)
  }
}

keys

values

mapValues(func)

scala> val arr = Array(1,1,2,2,3,3,4,4,5,5,6,6)
arr: Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6)

scala> sc.makeRDD(arr)
res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at <console>:27

scala> res8.map((_,null))
res9: org.apache.spark.rdd.RDD[(Int, Null)] = MapPartitionsRDD[8] at map at <console>:26

scala> res9.groupByKey()
res10: org.apache.spark.rdd.RDD[(Int, Iterable[Null])] = ShuffledRDD[9] at groupByKey at <console>:26

scala> res10.map(_._1)
res11: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at map at <console>:26

scala> res10.keys
res12: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[11] at keys at <console>:26

scala> res10.values
res13: org.apache.spark.rdd.RDD[Iterable[Null]] = MapPartitionsRDD[12] at values at <console>:26

scala> res10.map(_._2)
res14: org.apache.spark.rdd.RDD[Iterable[Null]] = MapPartitionsRDD[13] at map at <console>:26

scala> val arr = Array(("zhangsan",3000),("lisi",3500),("wangwu",4300))
arr: Array[(String, Int)] = Array((zhangsan,3000), (lisi,3500), (wangwu,4300))

scala> arr.mapValues(_+1000)
<console>:26: error: value mapValues is not a member of Array[(String, Int)]
       arr.mapValues(_+1000)
           ^

scala> sc.makeRDD(arr)
res16: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[14] at makeRDD at <console>:27

scala> res16.mapValues(_+1000)
res17: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at mapValues at <console>:26

flatMapValues(func)

scala> val arr = Array("a b c","d e f")
arr: Array[String] = Array(a b c, d e f)

scala> sc.makeRDD(arr)
res18: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at makeRDD at <console>:27

scala> res18.flatMap(_.split(" "))
res19: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at flatMap at <console>:26

scala> val arr = Array(("zhangsan",Array(100,98,95)),("lisi",Array(95,35,100,82)))
arr: Array[(String, Array[Int])] = Array((zhangsan,Array(100, 98, 95)), (lisi,Array(95, 35, 100, 82)))

scala> sc.makeRDD(arr)
res20: org.apache.spark.rdd.RDD[(String, Array[Int])] = ParallelCollectionRDD[18] at makeRDD at <console>:27

scala> res20.flatMap(t=> t._2.map((t._1,_)))
res21: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[19] at flatMap at <console>:26

scala> res21.collect
res22: Array[(String, Int)] = Array((zhangsan,100), (zhangsan,98), (zhangsan,95), (lisi,95), (lisi,35), (lisi,100), (lisi,82))

scala> res20.flatMapValues()
<console>:26: error: not enough arguments for method flatMapValues: (f: Array[Int] => TraversableOnce[U])org.apache.spark.rdd.RDD[(String, U)].
Unspecified value parameter f.
       res20.flatMapValues()
                          ^

scala> res20.flatMapValues(t=> t)
res24: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[20] at flatMapValues at <console>:26

scala> val arr = Array(("zhangsan","100,98,92"),("lisi","100,93,45,60"))
arr: Array[(String, String)] = Array((zhangsan,100,98,92), (lisi,100,93,45,60))

scala> sc.makeRDD(arr)
res25: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[21] at makeRDD at <console>:27

scala> res25.flatMapValues(_.split(","))
res26: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[22] at flatMapValues at <console>:26

scala> res26.collect
res27: Array[(String, String)] = Array((zhangsan,100), (zhangsan,98), (zhangsan,92), (lisi,100), (lisi,93), (lisi,45), (lisi,60))

flatMapvalues它的功能是压平value,并且和key进行匹配,flatMapValues中放入的函数功能是怎么处理value变成集合形式

课堂练习

# 输入如下:
Array("chinese-zhangsan,lisi,wangwu"
,"math-zhangsan,zhaosi"
,"english-lisi,wangwu,zhaosi")
# 求出每个老师教授的课程都有什么?
zhangsan-chinese,math
lisi-chinese,english
wangwu-chinese,english
zhaosi-math,english
# 思路是反推
# 1.通过结果能够得出,上一步的数据是 (zhangsan,chinese),(zhangsan,math)
# 2.reduceBykey进行合并得出的
# 3.数据怎么变成的 (zhangsan,chinese),(zhangsan,math)
# 4.首先应该将chinese-zhangsan,lisi,wangwu 处理掉

整体代码:

package com.hainiu.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Test1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("parse")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val arr = Array("chinese-zhangsan,lisi,wangwu", "math-zhangsan,zhaosi", "english-lisi,wangwu,zhaosi")
    val rdd:RDD[String] = sc.makeRDD(arr)
    rdd.map(t => {
      val strs = t.split("-")
      (strs(0), strs(1))
    }).flatMapValues(_.split(","))
      .map(_.swap)
      .reduceByKey(_+","+_)
      .foreach(t=>println(t._1+"-"+t._2))
  }
}

sortByKey()

scala> sc.textFile("/a.txt").flatMap(_.split(" ")).map((_,1))
res28: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[26] at map at <console>:25

scala> res28.reduceByKey(_+_)
res29: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[27] at reduceByKey at <console>:26

scala> res29.collect
res30: Array[(String, Int)] = Array((tom,8), (hello,16), (world,8))             

scala> res29.sortByKey()
res31: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[30] at sortByKey at <console>:26

scala> res31.collect
res32: Array[(String, Int)] = Array((hello,16), (tom,8), (world,8))

scala> res29.sortByKey(false)
res33: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at sortByKey at <console>:26

scala> res33.collect
res34: Array[(String, Int)] = Array((world,8), (tom,8), (hello,16))

scala> res29.sortByKey(false,3)
res35: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[36] at sortByKey at <console>:26

scala> res35.partitions.size
res36: Int = 3

scala> res29.map(t=> (t._2,t))
res37: org.apache.spark.rdd.RDD[(Int, (String, Int))] = MapPartitionsRDD[37] at map at <console>:26

scala> res29.map(t=> (t._2,t)).values
res38: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[39] at values at <console>:26

scala> res38.collect
res39: Array[(String, Int)] = Array((tom,8), (hello,16), (world,8))

sortBy()

scala> res29
res40: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[27] at reduceByKey at <console>:26

scala> res29.collect
res41: Array[(String, Int)] = Array((tom,8), (hello,16), (world,8))

scala> res29.sortBy(-_._2)
res42: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[44] at sortBy at <console>:26

scala> res42.collect
res43: Array[(String, Int)] = Array((hello,16), (tom,8), (world,8))

scala> res29.sortBy
   def sortBy[K](f: ((String, Int)) => K,ascending: Boolean,numPartitions: Int)(implicit ord: Ordering[K],implicit ctag: scala.reflect.ClassTag[K]): org.apache.spark.rdd.RDD[(String, Int)]

scala> res29.sortBy(-_._2,false,3)
res44: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[49] at sortBy at <console>:26

scala> res44.collect
res45: Array[(String, Int)] = Array((tom,8), (world,8), (hello,16))

scala> res44.partitions.size
res46: Int = 3

file

同样groupby底层和sortby的底层实现一样,groupby底层是groupBykey

cogroup()

scala> arr
res70: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))

scala> arr1
res71: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))

scala> sc.makeRDD(arr)
res72: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[68] at makeRDD at <console>:27

scala> sc.makeRDD(arr1)
res73: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[69] at makeRDD at <console>:27

scala> res72 cogroup res73
res74: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[71] at cogroup at <console>:28

scala> res74.collect
res75: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((zhangsan,(CompactBuffer(300),CompactBuffer(22))), (wangwu,(CompactBuffer(350),CompactBuffer(30))), (lisi,(CompactBuffer(400),CompactBuffer(24))), (zhaosi,(CompactBuffer(450),CompactBuffer())), (guangkun,(CompactBuffer(),CompactBuffer(5))))

scala> res74.mapValues(t=> t._1.sum * t._2.sum)
res76: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[72] at mapValues at <console>:26

scala> res76.collect
res77: Array[(String, Int)] = Array((zhangsan,6600), (wangwu,10500), (lisi,9600), (zhaosi,0), (guangkun,0))

join()——innerjoin

scala> val arr = Array(("zhangsan",300),("lisi",400),("wangwu",350),("zhaosi",450))
arr: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))

scala> val arr1 = Array(("zhangsan",22),("lisi",24),("wangwu",30),("guangkun",5))
arr1: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))

scala> sc.makeRDD(arr)
res47: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[50] at makeRDD at <console>:27

scala> sc.makeRDD(arr1)
res48: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[51] at makeRDD at <console>:27

scala> res47 join res48
res49: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[54] at join at <console>:28

scala> res49.collect
res50: Array[(String, (Int, Int))] = Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24)))

scala> res49.mapValues(t=> t._1*t._2)
res51: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[55] at mapValues at <console>:26

scala> res51.collect
res52: Array[(String, Int)] = Array((zhangsan,6600), (wangwu,10500), (lisi,9600))

leftOuterJoin()——leftjoin

scala> arr
res53: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))

scala> arr1
res54: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))

scala> sc.makeRDD(arr)
res55: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[56] at makeRDD at <console>:27

scala> sc.makeRDD(arr1)
res56: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[57] at makeRDD at <console>:27

scala> res55 leftOuterJoin res56
res57: org.apache.spark.rdd.RDD[(String, (Int, Option[Int]))] = MapPartitionsRDD[60] at leftOuterJoin at <console>:28

scala> res57.collect
res58: Array[(String, (Int, Option[Int]))] = Array((zhangsan,(300,Some(22))), (wangwu,(350,Some(30))), (lisi,(400,Some(24))), (zhaosi,(450,None)))

scala> res57.mapValues(t=> t._1*t._2.getOrElse(0))
res59: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[61] at mapValues at <console>:26

scala> res59.collect
res60: Array[(String, Int)] = Array((zhangsan,6600), (wangwu,10500), (lisi,9600), (zhaosi,0))

rightOuterJoin()——rightjoin

scala> arr
res61: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))

scala> arr1
res62: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))

scala> sc.makeRDD(arr)
res63: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[62] at makeRDD at <console>:27

scala> sc.makeRDD(arr1)
res64: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[63] at makeRDD at <console>:27

scala> res63 rightOuterJoin res64
res65: org.apache.spark.rdd.RDD[(String, (Option[Int], Int))] = MapPartitionsRDD[66] at rightOuterJoin at <console>:28

scala> res65.mapValues(t=> t._2*t._1.getOrElse(0))
res68: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[67] at mapValues at <console>:26

scala> res68.collect
res69: Array[(String, Int)] = Array((zhangsan,6600), (wangwu,10500), (lisi,9600), (guangkun,0))

9.2.1.3 rdd上面的分区器

之前学过的kv类型上面的算子

groupby groupByKey reduceBykey sortBy sortByKey join[cogroup left inner right] shuffle的

mapValues keys values flatMapValues 普通算子,管道形式的算子

shuffle的过程是因为数据产生了打乱重分,分组、排序、join等算子需要将数据重新排版

shuffle的过程是上游的数据处理完毕写出到自己的磁盘上,然后下游的数据从磁盘上面拉取

file

重新排版打乱重分是需要存在规则的

中间数据的流向规则叫做分区器 partitioner,这个分区器一般是存在于shuffle类算子中的,我们可以这么说,shuffle类算子一定会带有分区器,分区器也可以单独存在,人为定义分发规则

groupBy groupBykey reduceBykey 自带的分区器HashPartitioner

file

sortby sortBykey rangePartitioner

file

hashPartitioner

规则 按照key的hashCode %下游分区 = 分区编号

file

file

保证取余的结果为正向结果

hash取余的方式,不管数据分发到下游的什么分区中,最终结果都是相同的数据放入到一起

演示结果

scala> val arr = Array(1,2,3,4,5,6,7,8,9)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> sc.makeRDD(arr,3)
res78: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[73] at makeRDD at <console>:27

scala> res78.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res79: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[74] at mapPartitionsWithIndex at <console>:26

scala> res79.collect
res80: Array[(Int, Int)] = Array((0,1), (0,2), (0,3), (1,4), (1,5), (1,6), (2,7), (2,8), (2,9))

scala> res78.map(t=>(t,t))
res81: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[75] at map at <console>:26

scala> res78.partitioner
res82: Option[org.apache.spark.Partitioner] = None

scala> res81.reduceByKey(_+_)
res84: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[76] at reduceByKey at <console>:26

scala> res84.partitioner
res85: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@3)

scala> res84.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res88: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[77] at mapPartitionsWithIndex at <console>:26

scala> res88.collect
res89: Array[(Int, (Int, Int))] = Array((0,(6,6)), (0,(3,3)), (0,(9,9)), (1,(4,4)), (1,(1,1)), (1,(7,7)), (2,(8,8)), (2,(5,5)), (2,(2,2)))

演示的逻辑,就是按照key.hashcode进行分区,int类型的hashcode值是自己的本身

并且hash分区器的规则致使我们可以任意的修改下游的分区数量

scala> res81.reduceByKey(_+_,100)
res91: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[78] at reduceByKey at <console>:26

scala> res91.partitions.size
res92: Int = 100

scala> res81.reduceByKey(_+_,2)
res93: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[79] at reduceByKey at <console>:26

scala> res93.partitions.size
res94: Int = 2

rangePartitioner

hashPartitioner规则非常简单,直接规定来一个数据按照hashcode规则的分配,规则比较简答,但是会出现数据倾斜

range分区规则中存在两个方法

file

rangeBounds界限,在使用这个分区器之前先做一个界限划定

file

首先使用水塘抽样算法,在未知的数据集中抽取能够代表整个数据集的样本,根据样本进行规则设定

然后在使用getPartitions

file

首先存在水塘抽样,规定数据的流向以后再执行整体逻辑,会先触发计算

file

sortBykey是转换类的算子,不会触发计算

file

但是我们发现它触发计算了,因为首先在计算之前先进行水塘抽样,能够规定下游的数据规则,然后再进行数据的计算

scala> arr
res101: Array[Int] = Array(1, 9, 2, 8, 3, 7, 4, 6, 5)

scala> arr.map(t=> (t,t))
res102: Array[(Int, Int)] = Array((1,1), (9,9), (2,2), (8,8), (3,3), (7,7), (4,4), (6,6), (5,5))

scala> sc.makeRDD(res102)
res104: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[94] at makeRDD at <console>:27

scala> res104.sortByKey()
res105: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[97] at sortByKey at <console>:26

scala> res105.partitioner
res106: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@fe1f9dea)

scala> res104.sortByKey(true,3)
res107: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[100] at sortByKey at <console>:26

scala> res107.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res109: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[101] at mapPartitionsWithIndex at <console>:26

scala> res109.collect
res110: Array[(Int, (Int, Int))] = Array((0,(1,1)), (0,(2,2)), (0,(3,3)), (1,(4,4)), (1,(5,5)), (1,(6,6)), (2,(7,7)), (2,(8,8)), (2,(9,9)))

range分区器,它是先做抽样然后指定下游分区的数据界限

它可以修改分区数量,但是分区数量不能大于元素个数,必须要保证每个分区中都有元素

scala> res104.sortByKey(true,3)
res111: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[104] at sortByKey at <console>:26

scala> res104.sortByKey(true,300)
res112: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[107] at sortByKey at <console>:26

scala> res111.partitions.size
res114: Int = 3

scala> res112.part

9.2.1.4 自定义分区器

工作的过程中我们会遇见数据分类的情况,想要根据自己的需求定义分区的规则,让符合规则的数据发送到不同的分区中,这个时候我们就需要自定义分区器了

file

定义分区器,让数据发送到不同的分区,从而不同的task任务输出的文件结果内容也不同

# 自己创建数据data/a.txt
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
# 需求就是将数据按照规则进行分发到不同的分区中
# 存储的时候一个文件存储hello其他的文件存储tom jack

分区器的定义需要实现分区器的接口

class MyPartitioner extends Partitioner{
  override def numPartitions: Int = ???
// 设定下游存在几个分区
  override def getPartition(key: Any): Int = ???
// 按照key设定分区的位置
}

整体代码:

package com.hainiu.spark

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object Test1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("parse")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("data/a.txt")
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
    val rdd1 = rdd.partitionBy(new MyPartitioner)
    val fs = FileSystem.get(new Configuration())
    val out = "data/res"
    if(fs.exists(new Path(out)))
      fs.delete(new Path(out),true)
    rdd1.saveAsTextFile(out)
  }
}
class MyPartitioner extends Partitioner{
  override def numPartitions: Int = 2

  override def getPartition(key: Any): Int = {
    if(key.toString.equals("hello"))
      0
    else
      1
  }
}

file

课堂练习

# 需求
# 按照teacher.txt中的数据,包含专业和老师的数据
# 使用自定义分区的方式实现专业top2
# 思路
# 1 首先求出每个老师的访问量
# 2 按照专业将访问量进行分区
# 3 然后使用mapPartitions一次性取出一个分区进行排序
package com.hainiu.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import java.net.URL

object Test2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("parse")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd:RDD[((String,String),Int)] = sc.textFile("data/teacher.txt")
      .map(t=>{
        val url = new URL(t) //spark.hainiubl.com/unclewang
        val host = url.getHost //spark.hainiubl.com
        val path = url.getPath // /unclewang
        val subject = host.split("\\.")(0)
        val teacher = path.substring(1)
        ((subject,teacher),1)
      }).reduceByKey(_+_)

    val rddSubject:RDD[String] = rdd.keys.map(_._1).distinct()
    val arrSubject = rddSubject.collect()

    val rdd1 = rdd.partitionBy(new MyTeacherPartitioner(arrSubject))
    rdd1.mapPartitions(it=>{
      it.toList.sortBy(-_._2).take(2).toIterator
    }).foreach(println)
  }
}
class MyTeacherPartitioner(arr:Array[String]) extends Partitioner{
  override def numPartitions: Int = arr.size

  override def getPartition(key: Any): Int = {
    val subject = key.asInstanceOf[(String,String)]._1
    arr.indexOf(subject)
  }
}

file

9.2.1.5 join的原理

join是两个结果集之间的链接,需要进行数据的匹配

演示一下join是否存在shuffle

scala> val arr = Array(("zhangsan",300),("lisi",400),("wangwu",350),("zhaosi",450))
arr: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))

scala> val arr1 = Array(("zhangsan",22),("lisi",24),("wangwu",30),("guangkun",5))
arr1: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))

scala> sc.makeRDD(arr,3)
res116: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[108] at makeRDD at <console>:27

scala> sc.makeRDD(arr1,3)
res117: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[109] at makeRDD at <console>:27

scala> res116 join res117
res118: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[112] at join at <console>:28

scala> res118.collect
res119: Array[(String, (Int, Int))] = Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24)))

如果两个rdd没有分区器,分区个数一致

file

file

分区个数不变

file

如果分区个数不一致,产生的rdd的分区个数以多的为主

scala> sc.makeRDD(arr,3)
res128: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[118] at makeRDD at <console>:27

scala> sc.makeRDD(arr1,3)
res129: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[119] at makeRDD at <console>:27

scala> res128.reduceByKey(_+_)
res130: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[120] at reduceByKey at <console>:26

scala> res129.reduceByKey(_+_)
res131: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[121] at reduceByKey at <console>:26

scala> res130 join res131
res132: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[124] at join at <console>:28

scala> res132.collect
res133: Array[(String, (Int, Int))] = Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24)))

scala> res132.partitions.size
res134: Int = 3

如果分区个数一样并且分区器一样,那么是没有shuffle的

file

分区个数和原来的一样

scala> val arr = Array(("zhangsan",300),("lisi",400),("wangwu",350),("zhaosi",450))
arr: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))

scala>  val arr1 = Array(("zhangsan",22),("lisi",24),("wangwu",30),("guangkun",5))
arr1: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))

scala> sc.makeRDD(arr,3)
res0: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:27

scala> sc.makeRDD(arr1,4)
res1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:27

scala> res0.reduceByKey(_+_)
res2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at reduceByKey at <console>:26

scala> res1.reduceByKey(_+_)
res3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at <console>:26

scala> res2 join res3
res4: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[6] at join at <console>:28

scala> res4.collect
res5: Array[(String, (Int, Int))] = Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24)))

scala> res4.partitions.size
res6: Int = 4

都存在分区器但是分区个数不同,也会存在shuffle

file

分区个数以多的为主

scala> arr
res7: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))

scala> arr1
res8: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))

scala> sc.makeRDD(arr,3)
res9: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at <console>:27

scala> sc.makeRDD(arr,4)
res10: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at makeRDD at <console>:27

scala> res9.reduceByKey(_+_)
res11: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:26

scala> res10 join res11
res12: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:28

scala> res12.partitions.size
res13: Int = 3

scala> res12.collect
res14: Array[(String, (Int, Int))] = Array((zhangsan,(300,300)), (wangwu,(350,350)), (lisi,(400,400)), (zhaosi,(450,450)))

file

一个带有分区器一个没有分区器,那么以带有分区器的rdd分区数量为主,并且存在shuffle

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-DER,http://hainiubl.com/topics/76296
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter