请选择 进入手机版 | 继续访问电脑版

好程序员-千锋教育旗下高端IT职业教育品牌

400-811-9990
我的账户
好程序员

专注高端IT职业培训

亲爱的猿猿,欢迎!

已有账号,请

如尚未注册?

[BigData] 好程序员大数据培训大数据的架构体系

[复制链接]
叶子老师 发表于 2019-8-13 17:10:04 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
好程序员大数据培训分享大数据的架构体系:
            flume采集数据
            MapReduce
            HBse (HDFS)
            Yarn   资源调度系统
  展示平台 数据平台
            1,提交任务
            2,展示结果数据
  spark 分析引擎  S3   可以进行各种的数据分析 , 可可以和hive进行整合 ,spark任务可以运行在Yarn
提交任务到集群的入口类    SC
为什么用spark :   速度快,易用,通用,兼容性高
hadoop
scala
jdk
spark
如果结果为定长的  toBuffer编程变长的
启动流程
spark集群启动流程  和任务提交
主节点 master
子节点work   多台
start-allsh 脚本 先启动master服务      启动work
master  提交注册信息   work 响应   work会定时发送心跳信息
集群启动流程
      1、调用start-all脚本   ,开始启动Master
      2master启动以后,preStart方法调用了一个定时器,定时的检查超时的worker
      3、启动脚本会解析slaves配置文件,找到启动work的相应节点,开始启动worker
      4worker服务启动后开始调用prestart方法(生命周期方法)开始向所有的master注册
      5master接收到work发送过来的注册信息,master 开始保存注册信息并把自己的URL响应给worker
      6worker接收到masterURL后并更新,开始掉用一个定时器,定时的向master发送心跳信息
任务提交流程
将任务rdd通过客户端submit 提交给Master  的管道 (队列:先进先出)
                      worker启动Executor子进程   来从master拿取任务信息
                      Executor  向客户端Driver端注册
                         客户端收到注册信息  客户端就会将任务给 Executor进行人物计算
任务提交流程
        1、首先Driver端会通过spark-submit脚本启动sparkSubmint进程,此时开始创建重要的对象(SparkContext),启动后开始向Master发送信息开始通信
        2Master接收到发送过来的信息后,开始生成任务信息,并把任务信息放到队列中
        3master开始把所有有效的worker过滤出来并进行排序,按照空闲的资源进行排序
        4Master开始向有效的worker通知拿取任务信息,并启动相应的Executor
        5worker启动Executor ,并向Driver反向注册
        6Driver开始把生成的task发送给相应的ExecutorExecutor
WordCount中产生的RDD
hdfs上有三个文件  sc.textFile(“路径”)方法  生成第一个RDD  HadoopRDD    第二个RDD  MapPartitionsRDD  flatMap(_.split()"") 生成 第三个RDD  MapPartitionsRDD
                                map((_,1))生成第四个RDD  MapPartitionsRDD    reduceByKey  生成第五个 ShuffledRDD      saveAsTextFile  生成第六个RDD MapPartitionsRDD
.toDebugString  可以看出RDD
分区
Partition  后跟分区  分区本身不会改变  会生成以一个新的RDD分区为修改后  因为rdd本身不可变   修改后大于原本分区的会发生shullfer  小于的不会发生
coalesce    后跟分区少于原来的分区则会改变  因为不会发生shuffle  大于时则不可改变
PartitionBy  后跟新的分区器 new  全名称的分区器org.apache.spark.hparPartition
客户端提交Job任务信息给Master
Master生成任务信息 master 生成任务信息描述任务的数据   通知work 创建相应的Executor
      客户端将job信息给work  workExecutor 进行计算数据
object Demo {
  def main(args: Array[String]): Unit = {
//SparkConf:构架配置信息类,优先于集群配置文件
//setAppName:指定应用程序名称,如果不指定,会自动生成类似于uuid产生的名称
//setMaster:指定运行模式:local[1]-用一个线程模拟集群运行,local[2] -用两个集群模拟线程集群运行,local
  • -有多少线程就用多少线程运行
  •     val conf= new SparkConf().setAppName("")    // setAppName起名称  setMaster  在哪里运行  是本地还是  []是调用多少线程来运行
    .setMaster("local[2]") //在打包上传集群时  不需要这一步直接删除或是注释掉
    //创建提交任务到集群的入口类(上下文对象)
          val sc =  new SparkContext(conf)
    //获取hdfs的数据
    val lines = sc.textFile("hdfs://suansn:9000/wc")
    val words= lines.flatMap(_.split(" ")) // 切分后生成单词
    val tuples=words.map((_,1))   //将单词生成一个元组
    val  sum= tuples.reduceBykey(_+_)  // 进行聚合
    val PX = sum.sortBy(_._2,false)  // 倒叙拍寻
    print(PX.collect.toBuffer) // 打印至控制台   在打包上传集群时  不需要这一步直接删除或是注释掉
    PX.saveAsTextFile("hdfs://suansn:9000/ssss")
    sc.stop     //释放资源
      }
    }
    RDD     提供的方法  叫做算子
    RDD  数据集 数据的抽象  是一种类型,提供方法处理数据    分布式  仅仅是指向数据 ,   不可变 如果想要其他的操作 就在另外定义一个 RDD 。 可分区   如果一个文件 小于128M  就是一个分区 如果大于将根据大小来分区
    一组分片   一个计算每个分区的函数   RDD之间的依赖关系   一个Partitioner,即RDD的分片函数。   一个列表,存储存取每个Partition的优先位置(preferred location)。
    RDD  有两种类型      一个算子对应一个Actionjob
          1 Transformation  转换的类型    延迟加载   只是记录计算过程 并不执行     只有调用  Action 类型的算子后  触发job 生成计算
                          如果没有Transformation 算子  而全是Action算子  就无法优化  集群一直处于繁忙状态。
          2Action
    sc.parallelize 并行方法创建RDD
    val rdd1 = sc.parallelize(List(3,4,6,5,8,7,9,2,1),2)
       每个数据乘10
    val rdd2 = rdd1.map(_*10)
                                      Array[Int] = Array(30, 40, 60, 50, 80, 70, 90, 20, 10)
    利用分区计算  mapPartitions
    val rdd2= rdd1.mapPartitions(_.map(_*10))     //map_  表示每个分区的数据  封装到Iterator
                                    Array[Int] = Array(30, 40, 60, 50, 80, 70, 90, 20, 10)
    mapWith               //map的变异  也可将元素数据遍历出来  将分区号作为输入 返回到A类型作为输出
            (constructA: Int => A)(f: (T, A) => RDD[U])
    参数列表:(constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U)    // Int => A 操作的每个分区的分区号,preservesPartitioning: Boolean = false  是否记录rdd的分区信息                                (T, A)   Trdd中的元素
                                            //        实现了柯里化的步骤  两个A的传入
    rdd1.mapWith(i => i*10)((a, b) => b+2).collect    //分区号 i  乘以10   B接收 A RDD的元素
                                            Array[Int] = Array(2,2,2,2,12,12,12,12,12)
    flatMapWith   //分区排序
    (constructA: Int => A)(f: (T, A) => Seq[U])
    参数列表:(constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U])
    rdd1.flatMapWith(i => i, true)((x, y) => List((y, x))).collect  // i为分区号   原样不懂输出   true 相当于 允许记录分区信息    Y为拿到的分区号  X RDD的元素
                                            Array[Int,Int)] = Array((0,3)(0,4)(0,6)(0,5)(1,8)(1,7)(1,9)(1,2)(1,1))
    mapPartitions   f: Iterator[T] => Iterator[U]
    rdd1.mapPartitions(_.toList.reverse.iterator).collect    //  每个分区颠倒排列
                                                            Array[Int] = Array(5, 6, 4, 3, 1, 2, 9, 7, 8)
    mapPartitionsWithIndex           循环分区并可以操作分区号
    参数列表:(f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)      //Iterator[(Int)  分区信息   index: Int  分区号
    val func = (index: Int, iter: Iterator[(Int)]) => {
      iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
    }
    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    rdd1.mapPartitionsWithIndex(func).collect
                                                            Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])
    aggregate    //  聚合算子
    (zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
    def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
      iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
    }
    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    rdd1.mapPartitionsWithIndex(func1).collect
                                                            Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])
    rdd1.aggregate(0)(math.max(_, _), _ + _)    // 循环时  第一个_ 拿到的时初始值0  第二个_拿到的0分区第一个元素  然后判断最大值  依次类推     局部聚合完,最后全局聚合时   初始值+ 0分区的最大值。第1分区的最大值
                                            Int=13
    rdd1.aggregate(5)(math.max(_, _), _ + _)   //原理和上面的相同不过初始值时5   这样得到的第0 分区的最大值就是 初始值  5  1分区的最大值还是9    最后的全局聚合时  就是5 + 5+9
                                            Int=19
    val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
    def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
      iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
    }
    rdd2.mapPartitionsWithIndex(func2).collect
                                            Array[String] = Array([partID:0, val: a], [partID:0, val: b], [partID:0, val: c], [partID:1, val: d], [partID:1, val: e], [partID:1, val: f])
    rdd2.aggregate("")(_ + _, _ + _)  //全局聚合和局部聚合    都属于字符串拼接   初始值为空
                                            String = abcdef   String = defabc  //因为不确定那个分区先完成任务所以 会出现两种结果
    rdd2.aggregate("=")(_ + _, _ + _)
                                            String = ==abc=def
    val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
    rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) // 取每个字符串的长度   第一次与初始值 比较 而后用第二个数据的长度与上一次比较后的长度相比较,   最后全局聚合时 两个分区最长的字符串和初始值相加
                                                            String = 24  String = 42
    val rdd4 = sc.parallelize(List("12","23","345",""),2)
    rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)  //  运算方法与上面的相同 这个求的字符串是最短的 因为在第二个分区内有个空数据字符串为0   第一个分区的因为初始值也为空 所以为空   tostring后第一次的变为字符串 0 长度为1 全局后为10
                                                            String = 10
    val rdd5 = sc.parallelize(List("12","23","","345"),2)
    rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)         与上面相同
                                                            String = 11
    aggregateByKey   通过相同的key 进行聚合
    (zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)
    //Partitioner  分区器
    val pairRDD = sc.parallelize(List(("mouse", 2),("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12)), 2)
    def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
      iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
      }
    pairRDD.mapPartitionsWithIndex(func2).collect
      //        全局聚合时 不会加 初始值
        pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect  // 相同的keyvalue值进行操作
      pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
      combineByKey   // 聚合的算子
          (createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
      val rdd1 = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1))
    val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
      rdd2.collect
      val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
    rdd.collect
      val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
      val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
      val rdd6 = rdd5.zip(rdd4)
      val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m
      countByKey
      val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
      rdd1.countByKey   //相同key value的个数
      rdd1.countByValue // 把整个rdd看成Value
      filterByRange  //给定范围  求
      val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
      val rdd2 = rdd1.filterByRange("c", "d")
      rdd2.collect
      flatMapValues
      val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
      rdd3.flatMapValues(_.split(" "))
      foldByKey
      val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
      val rdd2 = rdd1.map(x => (x.length, x))
      val rdd3 = rdd2.foldByKey("")(_+_)
      val rdd = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1))
      rdd.foldByKey(0)(_+_)
      foreachPartition  //
      val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
      rdd1.foreachPartition(x => println(x.reduce(_ + _)))  表示每个分区的数据的聚合值
      keyBy
      val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
      val rdd2 = rdd1.keyBy(_.length)   元素数据的长度生成为key 元素数据生成为value
      rdd2.collect
      keys values
      val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
      val rdd2 = rdd1.map(x => (x.length, x))
      rdd2.keys.collect
      rdd2.values.collect
      checkpoint
      sc.setCheckpointDir("hdfs://node01:9000/cp")
      val rdd = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
      rdd.checkpoint   checkpoint后的文件 准备存储  还未存储 没有Action 算子没有运行job
      rdd.isCheckpointed  查看是否运行checkpoint
      rdd.count     随便调动Avtion的算子 提交job
      rdd.isCheckpointed
      rdd.getCheckpointFile   查看checkpoint的文件存储的位置
      repartition, coalesce, partitionBy
      val rdd1 = sc.parallelize(1 to 10, 3)
      val rdd2 = rdd1.coalesce(2, false)
      rdd2.partitions.length
      collectAsMap   Array 转换map kv)对
      val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
      rdd.collectAsMap
    在一定时间范围内,求所有用户在经过所有基站停留时间最长的TOP2
    思路:获取用户产生的log日志并切分
          用户在基站停留的总时长
          过去基站的基础信息
          把经纬度信息join到用户数据中
          求出用户在某些基站停留的时长的TOP2
          object Demo  {
            def main(args: Array[String]): Unit = {
    //模板代码
    val   conf = new SparkConf()
    .setAppName("ML")
    .setMaster("local[2]")
    val sc= new SparkContext(conf)
    //获取用户访问基站的log
    val files=sc.textFile("地址")
    //切分用户的log
    val userInfo=files.map(line=>{
    val fields=line.split(",")
    val phone = fields(0)//用户手机号
    val time = fields(1).toLong//时间戳
    val lac = fields(2) //基站ID
    val eventType = fields(3)//事件类型
    val time_long = if(eventType.equals("1")) -time else time
    ((phone,lac),time_long)
    })
    //用户在相同的基站停留的总时长
    val  sumedUserAndTime  = userInfo.reduceByKey(_+_)
    //为了便于和基站基础信息进行Join 需要把数据调整,把基站ID作为key
    val lacAndPhoneAndTime sumedUserAndTime.map(tup =>{
    val phone = tup._1._1 //用户手机号
    val lac= tup._1._2//基站的ID
    val time = tup._2 //用户在某个基站停留的总时长
    (lac,(phone,time))
    })
    //获取基站的基础信息
    val lacInfo= sc.textFile("路径")
    //切分基站基础数据
    val lacAndXY=lacInfo.map (line =>{
    val fields = line.split(",")
    val lac= fields(0)//基站ID
    val x = files(1)//经度
    val y = fields(2)//纬度
    (lac,(x,y))
    })
    //把经纬度信息join到用户的访问信息
    val  joined=lacAndPhoneAndTime join  lacAndXY
    //为了便于以后发呢组排序计算,需要整合数据
    val phoneAndTimeAndXY=joined,map(tup=>{
    val phone = tup._2._1._1//手机号
    val lac = tup._1// ID
    val time  = tup._2._1._2
    val xy = tup._2._2 //经纬度
    phone,time,xy
    })
    //按照用户手机号进行分组
    val grouped=phoneAndTimeAndXY.groupBy(_._1)
    //按照时长进行组内排序
    //val  sorted = grouped.map(x => (x._,x._2.toList.sortBy(_._2).reverse))
    val  sorted = grouped.mapValues(_.toList.sortBy(_._2).reverse)
    //整合数据
    val filterede=sorted.map(tup =>{
    val phone= tup._1
    val list = tup._2
    val filteredList=list.map(x =>{
    val time  = x._2
    val xy = x._3
      List(time,xy)
    })
    (phone,filteredList)
    })
    val res = filterede.mapValues(_.take(2))
    sc.stop()
            }
          }
    好程序员大数据培训官网:http://www.goodprogrammer.org/bigdata.shtml

    精彩内容,一键分享给更多人!
    回复

    使用道具 举报

    您需要登录后才可以回帖

    本版积分规则

    关注我们
    好程序员
    千锋好程序员

    北京校区(总部):北京市海淀区宝盛北里西区28号中关村智诚科创大厦

    深圳西部硅谷校区:深圳市宝安区宝安大道5010号深圳西部硅谷B座A区605-619

    杭州龙驰智慧谷校区:浙江省杭州市下沙经济技术开发区元成路199号龙驰智慧谷B座7层

    郑州校区:郑州市二七区航海中路60号海为科技园C区10层、12层

    Copyright 2007-2019 北京千锋互联科技有限公司 .All Right

    京ICP备12003911号-5 京公安网11010802011455号

    请您保持通讯畅通1对1咨询马上开启