请选择 进入手机版 | 继续访问电脑版

好程序员-千锋教育旗下高端IT职业教育品牌

400-811-9990
我的账户
好程序员

专注高端IT职业培训

亲爱的猿猿,欢迎!

已有账号,请

如尚未注册?

[BigData] 大数据教程:Transformation和Action算子演示

[复制链接]
叶子老师 发表于 2019-6-17 14:34:49 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
大数据教程Transformation和Action算子演示
一、Transformation算子演示
val conf = new SparkConf().setAppName("Test").setMaster("local")
      val sc = new SparkContext(conf)

    //通过并行化生成rdd
    val rdd = sc.parallelize(List(5,6,4,7,3,8,2,9,10))

    //map:rdd里面每一个元乘以2然后排序
    val rdd2: RDD[Int] = rdd.map(_ * 2)
    //collect以数组的形式返回数据集的所有元素(Action算子)
    println(rdd2.collect().toBuffer)

    //filter:RDD由经过func函数计算后返回值为true的输入元素组成
    val rdd3: RDD[Int] = rdd2.filter(_ > 10)
    println(rdd3.collect().toBuffer)

    val rdd4 = sc.parallelize(Array("a b c","b c d"))
    //flatMap:rdd4中的元素进行切分后压平
    val rdd5: RDD[String] = rdd4.flatMap(_.split(" "))
    println(rdd5.collect().toBuffer)
    //假如: List(List(" a,b" ,"b c"),List("e c"," i o"))
    //压平 flatMap(_.flatMap(_.split(" ")))
   
    //sample随机抽样
    //withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样
    //fraction抽样比例例如30% 0.3 但是这个值是一个浮动的值不准确
    //seed用于指定随机数生成器种子 默认参数不传
    val rdd5_1 = sc.parallelize(1 to 10)
    val sample = rdd.sample(false,0.5)
    println(sample.collect().toBuffer)

    //union:求并集
    val rdd6 = sc.parallelize(List(5,6,7,8))
    val rdd7 = sc.parallelize(List(1,2,5,6))
    val rdd8 = rdd6 union rdd7
    println(rdd8.collect.toBuffer)

    //intersection:求交集
    val rdd9 = rdd6 intersection rdd7
    println(rdd9.collect.toBuffer)

    //distinct:去重出重复
    println(rdd8.distinct.collect.toBuffer)

    //join相同的key会被合并
    val rdd10_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2)))
    val rdd10_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10)))
    val rdd10_3 = rdd10_1 join rdd10_2
    println(rdd10_3.collect().toBuffer)
   
    //左连接和右连接
    //除基准值外是Option类型,因为可能存在空值所以使用Option
    val rdd10_4 = rdd10_1 leftOuterJoin rdd10_2 //以左边为基准没有是null
    val rdd10_5 = rdd10_1 rightOuterJoin rdd10_2 //以右边为基准没有是null
    println(rdd10_4.collect().toList)
    println(rdd10_5.collect().toBuffer)

    val rdd11_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2)))
    val rdd11_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10)))
    //笛卡尔积
    val rdd11_3 = rdd11_1 cartesian rdd11_2
    println(rdd11_3.collect.toBuffer)
  
   //根据传入的参数进行分组
    val rdd11_5_1 = rdd11_4.groupBy(_._1)
    println(rdd11_5_1.collect().toList)

    //按照相同key进行分组,并且可以制定分区
    val rdd11_5_2 = rdd11_4.groupByKey
    println(rdd11_5_2.collect().toList)

    //根据相同key进行分组[分组的话需要二元组
    //cogroup groupBykey的区别
    //cogroup不需要对数据先进行合并就以进行分组 得到的结果是 同一个key 和不同数据集中的数据集合
    //groupByKey是需要先进行合并然后在根据相同key进行分组
    val rdd11_6: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1 cogroup rdd11_2
    println(rdd11_6)
二、Action算子演示
val conf = new SparkConf().setAppName("Test").setMaster("local
  • ")
        val sc = new SparkContext(conf)
        /* Action 算子*/
        //集合函数
        val rdd1 = sc.parallelize(List(2,1,3,6,5),2)
        val rdd1_1 = rdd1.reduce(_+_)
        println(rdd1_1)
        //以数组的形式返回数据集的所有元素
        println(rdd1.collect().toBuffer)
        //返回RDD的元素个数
        println(rdd1.count())
        //取出对应数量的值 默认降序, 若输入0 会返回一个空数组
        println(rdd1.top(3).toBuffer)
        //顺序取出对应数量的值
        println(rdd1.take(3).toBuffer)
        //顺序取出对应数量的值 默认生序
        println(rdd1.takeOrdered(3).toBuffer)
        //获取第一个值 等价于 take(1)
        println(rdd1.first())
        //将处理过后的数据写成文件(存储在HDFS或本地文件系统)
        //rdd1.saveAsTextFile("dir/file1")
        //统计key的个数并生成map kkeyvkey的个数
        val rdd2 = sc.parallelize(List(("key1",2),("key2",1),("key3",3),("key4",6),("key5",5)),2)
        val rdd2_1: collection.Map[String, Long] = rdd2.countByKey()
        println(rdd2_1)
        //遍历数据
        rdd1.foreach(x => println(x))

        /*其他算子*/
        //统计value的个数 但是会将集合中的一个元素看做是一个vluae
        val value: collection.Map[(String, Int), Long] = rdd2.countByValue
        println(value)
        //filterByRange:RDD中的元素进行过滤,返回指定范围内的数据
        val rdd3 = sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1)))
        val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c","e")//包括开始和结束的
        println(rdd3_1.collect.toList)
        //flatMapValues对参数进行扁平化操作,value的值
        val rdd3_2 = sc.parallelize(List(("a","1 2"),("b","3 4")))
        println( rdd3_2.flatMapValues(_.split(" ")).collect.toList)
        //foreachPartition 循环的是分区数据
        // foreachPartiton一般应用于数据的持久化,存入数据库,可以进行分区的数据存储
        val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
        rdd4.foreachPartition(x => println(x.reduce(_+_)))
        //keyBy 以传入的函数返回值作为key ,RDD中的元素为value 新的元组
        val rdd5 = sc.parallelize(List("dog","cat","pig","wolf","bee"),3)
        val rdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length)
        println(rdd5_1.collect.toList)
        //keys获取所有的key  values 获取所有的values
        println(rdd5_1.keys.collect.toList)
        println(rdd5_1.values.collect.toList)
        //collectAsMap  将需要的二元组转换成Map
        val map: collection.Map[String, Int] = rdd2.collectAsMap()
        println(map)
  • 好程序员大数据教程:http://www.goodprogrammer.org/bigdata.shtml

    精彩内容,一键分享给更多人!
    回复

    使用道具 举报

    您需要登录后才可以回帖

    本版积分规则

    关注我们
    好程序员
    千锋好程序员

    北京校区(总部):北京市海淀区宝盛北里西区28号中关村智诚科创大厦

    深圳西部硅谷校区:深圳市宝安区宝安大道5010号深圳西部硅谷B座A区605-619

    杭州龙驰智慧谷校区:浙江省杭州市下沙经济技术开发区元成路199号龙驰智慧谷B座7层

    郑州校区:郑州市二七区航海中路60号海为科技园C区10层、12层

    Copyright 2007-2019 北京千锋互联科技有限公司 .All Right

    京ICP备12003911号-5 京公安网11010802011455号

    请您保持通讯畅通1对1咨询马上开启