阅读 79

Spark订单量的实时统计项目

Spark订单量的实时统计项目

需求:

1.各省份营业额的实时统计
2.各省份订单量的实时统计

数据:

orderId                   provinceId   orderPrice
201710261645320001,           12,          45.00

第一步:编写Scala代码模拟KafkaProducer产生订单数据

ConstantUtils.scala

定义Kafka相关的集群配置信息 Producer Configs

//存储常量数据
object ConstantUtils {
  /**
   * 定义Kafka相关的集群配置信息
   */
  //kafka集群配置信息
  val METADATA_BROKER_LIST = "bigdata02:9092"
  //序列化类
  val SERIALIZER_CLASS = "kafka.serializer.StringEncoder"
  //发送数据的方式
  val PRODUCER_TYPE = "async"
  //Topic名称
  val ORDER_TOPIC = "test1026"
  //OFFSET
  val AUTO_OFFSET_RESET = "largest"
}

OrderProducer.scala

import java.util.Properties

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}


object OrderProducer {
  def main(args: Array[String]): Unit = {
    //存储kafka集群相关的配置信息
    val props = new Properties()
    //创建生产者
    var producer: Producer[String, String] = null

    //kafka集群
   props.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST)
    //设置向topic中存储数据的方式
    props.put("producer.type", ConstantUtils.PRODUCER_TYPE)
    //设置key和message的序列化类
    props.put("serializer.class", ConstantUtils.SERIALIZER_CLASS)
    props.put("key.serializer.class", ConstantUtils.SERIALIZER_CLASS)

    try{

      //生产者的配置信息
      val config = new ProducerConfig(props)
      //创建一个producer实例对象
      producer = new Producer[String, String](config)
      //发送数据
      val message = new KeyedMessage[String,String](ConstantUtils.ORDER_TOPIC,key = "201710261645320001",message = "201710261645320001,12,45.00")
      //发送数据
      producer.send(message)

    }catch {
      //打印出来
      case e: Exception => e.printStackTrace()
    }finally {
      //最后关闭producer
      if (null != producer) producer.close()
    }
  }
}

创建主题:

bin/kafka-topics.sh --create --zookeeper bigdata02:2181 --replication-factor 1 --partitions 3 --topic test1026

启动消费者

cd /home/bigdata/apps/kafka_2.11-0.10.0.0/

./bin/kafka-console-consumer.sh  --bootstrap-server bigdata02:9092 --zookeeper  bigdata02:2181 --topic test1026 --from-beginning

第二步:模拟产生Json格式订单数据批量发送到KafkaTopic

OrderProducer.scala

import java.util.{Properties, UUID}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

import scala.collection.mutable.ArrayBuffer

//订单实体类
case class Order(orderId: String, provinceId: Int, price: Float)

object OrderProducer {
  def main(args: Array[String]): Unit = {

    /**
     * JackSon Object类
     */
    val mapper = new ObjectMapper()
    mapper.registerModule(DefaultScalaModule)


    //存储kafka集群相关的配置信息
    val props = new Properties()
    //创建生产者
    var producer: Producer[String, String] = null

    //kafka集群
   props.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST)
    //设置向topic中存储数据的方式
    props.put("producer.type", ConstantUtils.PRODUCER_TYPE)
    //设置key和message的序列化类
    props.put("serializer.class", ConstantUtils.SERIALIZER_CLASS)
    props.put("key.serializer.class", ConstantUtils.SERIALIZER_CLASS)

    try{

      //生产者的配置信息
      val config = new ProducerConfig(props)
      //创建一个producer实例对象
      producer = new Producer[String, String](config)
      //创建一个存储Message的可变数组
      val messageArrayBuffer = new ArrayBuffer[KeyedMessage[String, String]]()

      //模拟一直产生N条订单数据
      while (true) {

        //清空一下数组中的message
        messageArrayBuffer.clear()

        //随机数用于确定每次随机产生订单的数目
        val random: Int = RandomUtils.getRandomNum(1000) + 100

        //生成订单数据
        for (index <- 0 until random) {
          //订单id
          val orderId = UUID.randomUUID().toString
          //省份id
          val provinceId = RandomUtils.getRandomNum(34) + 1
          //订单金额
          val orderPrice = RandomUtils.getRandomNum(80) + 100.5F
          //创建订单实例
          val order = Order(orderId, provinceId, orderPrice)
          //TODO:如何将实体类转换为Json格式

          //创建一个message
          val message = new KeyedMessage[String, String](ConstantUtils.ORDER_TOPIC, "orderId", mapper.writeValueAsString(order))
          //打印Json数据
           println(s"Order Json: ${mapper.writeValueAsString(order)}")
          //发送数据
          //producer.send(message)
          //向meaage数组里面增加message
          messageArrayBuffer += message
        }
        //批量发送到topic
        producer.send(messageArrayBuffer: _*)
        //休息一下
        Thread.sleep(RandomUtils.getRandomNum(100) * 10)
      }
    }catch {
      //打印出来
      case e: Exception => e.printStackTrace()
    }finally {
      //最后关闭producer
      if (null != producer) producer.close()
    }
  }
}


第三步:基于SCALA的贷出模式编写SparkStreaming编程模块

OrderTurnoverStreaming.scala

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Minutes, Seconds, StreamingContext}

//从kafka获取数据,统计订单量和订单总值
object OrderTurnoverStreaming {

  System.setProperty("HADOOP_USER_NAME", "bigdata")

  val BATCH_INTERVAL: Duration = Seconds(2)

  //检查点目录
  val CHECKPOINT_DIRECTORY:String = "hdfs://bigdata02:9000/sparkckpt1027"

  //贷出函数
  def sparkOperation(args: Array[String])(operation: StreamingContext => Unit) = {
    //创建StreamingContext对象的函数
    def functionToCreateContext(): StreamingContext = {

      //创建配置对象
      val sparkConf = new SparkConf()
        //应用名称
        .setAppName("OrderTurnoverStreaming")
        //设置运行模式
        .setMaster("local[3]")

      //创建SparkContext上下文对象
      val sc = SparkContext.getOrCreate(sparkConf)
      //设置日志级别
      sc.setLogLevel("WARN")
      //创建ssc
      val ssc: StreamingContext =  new StreamingContext(sc, BATCH_INTERVAL)
      //调用用户函数
      operation(ssc)
      //确保交互式查询时候,数据不被删除
      ssc.remember(Minutes(1))
      //设置一下检查点目录
      ssc.checkpoint(CHECKPOINT_DIRECTORY)
      //返回ssc对象
      ssc

    }

    var context:StreamingContext = null
    try{
      //stop any esxiting StreamingContext
      val stopActiveContext = true
      if (stopActiveContext) {
        StreamingContext.getActive().foreach(_.stop(stopSparkContext = true))
      }

      // context = StreamingContext.getOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
      context = StreamingContext.getActiveOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)

      //设置日志级别 ,如果从检查点恢复创建StreamingContext,日志级别不生效,就重新设置
      context.sparkContext.setLogLevel("WARN")
      //start计算
      context.start()
      //wait 停止
      context.awaitTermination()
    }catch {
      case e: Exception => e.printStackTrace()
    } finally {
      context.stop(stopSparkContext = true,stopGracefully = true)
    }

  }

  def main(args: Array[String]): Unit = {
    sparkOperation(args)(processOrderData)
  }

  //用户函数   真正编写程序逻辑的地方
  def processOrderData(ssc: StreamingContext): Unit = {

  }

}

第四步:从Kafka的订单Topic读取及编程实现【实时累加统计各省份销售营业额】

OrderTurnoverStreaming

import java.text.SimpleDateFormat
import java.util.Date

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Minutes, Seconds, StreamingContext}

//从kafka获取数据,统计订单量和订单总值
object OrderTurnoverStreaming {

  System.setProperty("HADOOP_USER_NAME", "bigdata")

  val BATCH_INTERVAL: Duration = Seconds(2)

  //检查点目录
  val CHECKPOINT_DIRECTORY:String = "hdfs://bigdata02:9000/sparkckpt1027"

  //贷出函数
  def sparkOperation(args: Array[String])(operation: StreamingContext => Unit) = {
    //创建StreamingContext对象的函数
    def functionToCreateContext(): StreamingContext = {

      //创建配置对象
      val sparkConf = new SparkConf()
        //应用名称
        .setAppName("OrderTurnoverStreaming")
        //设置运行模式
        .setMaster("local[3]")

      //创建SparkContext上下文对象
      val sc = SparkContext.getOrCreate(sparkConf)
      //设置日志级别
      sc.setLogLevel("WARN")
      //创建ssc
      val ssc: StreamingContext =  new StreamingContext(sc, BATCH_INTERVAL)
      //调用用户函数
      operation(ssc)
      //确保交互式查询时候,数据不被删除
      ssc.remember(Minutes(1))
      //设置一下检查点目录
      ssc.checkpoint(CHECKPOINT_DIRECTORY)
      //返回ssc对象
      ssc

    }

    var context:StreamingContext = null
    try{
      //stop any esxiting StreamingContext
      val stopActiveContext = true
      if (stopActiveContext) {
        StreamingContext.getActive().foreach(_.stop(stopSparkContext = true))
      }

      // context = StreamingContext.getOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
      context = StreamingContext.getActiveOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)

      //设置日志级别 ,如果从检查点恢复创建StreamingContext,日志级别不生效,就重新设置
      context.sparkContext.setLogLevel("WARN")
      //start计算
      context.start()
      //wait 停止
      context.awaitTermination()
    }catch {
      case e: Exception => e.printStackTrace()
    } finally {
      context.stop(stopSparkContext = true,stopGracefully = true)
    }

  }

  def main(args: Array[String]): Unit = {
    sparkOperation(args)(processOrderData)
  }

  //用户函数   真正编写程序逻辑的地方
  def processOrderData(ssc: StreamingContext): Unit = {
    /**
     * 1.从Kafka topic 里面获取数据  direct模式
     */

    //设置kafka连接的相关信息(第一个参数)
    val kafkaParams: Map[String, String] = Map(
      "metadata.broker.list" -> ConstantUtils.METADATA_BROKER_LIST,
      "auto.offset.reset" -> ConstantUtils.AUTO_OFFSET_RESET
    )
    //Topics(第二参数)
    val topicsSet = ConstantUtils.ORDER_TOPIC.split(",").toSet

    //采用直接方式从kafka topic里面读取数据
    val orderDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

    //需求一开始
        /**
     * 2.处理订单数据  需求一:实时累加统计各省份的营业额
     */

    val orderTurnoverDStream = orderDStream
      //a.解析json格式数据,数据转换为DStream[(key,value)]
      .map(tuple => {
        //解析Json格式数据为Order
        val order = ObjectMapperSingleton.getInstance().readValue(tuple._2, classOf[Order])
        //返回
        (order.provinceId, order)
      }
      )
      //b.使用updateStateByKey 进行实时累加统计 --各省份
      .updateStateByKey(
        //updateFunc
        (orders: Seq[Order], state: Option[Float]) => {
          //获取当前省份传递进来的订单营业额
          val currentOrdersPrice = orders.map(_.price).sum
          //获取当前省份以前的营业额
          val previousPrice = state.getOrElse(0.0F)
          //累加
          Some(currentOrdersPrice + previousPrice)
        }
      )
    /**
     * 3.将各个省份实时统计的营业额进行输出
     */
    orderTurnoverDStream.foreachRDD(
      (rdd, time) => {
        //格式化时间
        val bacthInterval = new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss").format(new Date(time.milliseconds))
        println("--------------------------------------")
        println(s"bacthInterval :${bacthInterval}")
        println("--------------------------------------")
        rdd.coalesce(1)
          .foreachPartition(iter =>{iter.foreach(println)})
      })
  }

}

//创建ObjectMapper单例对象
object ObjectMapperSingleton {
  /**
   * 声明对象是使用transient 有如下含义
   * 1.当对象被序列化的时候,transient 阻止实例中哪些关键字声明的变量持久化
   * 2.当对象被反序列化的时候,实例变量不会被持久化和恢复
   */

  @transient private var instance: ObjectMapper = _

  def getInstance(): ObjectMapper = {
    if (instance == null) {
      instance = new ObjectMapper()
      instance.registerModule(DefaultScalaModule)
    }
    instance
  }


}




第五步:优化实时程序设置参数(处理条目数、序列化等)

  //检查点目录修改
  val CHECKPOINT_DIRECTORY:String = "hdfs://bigdata02:9000/sparkckpt1028"
      //TODO:设置从kafka topic 中每秒钟获取每个分区数据最多的条目数
      sparkConf.set("spark.streaming.kafka.maxRatePerPartition","10000")

      //TODO:设置序列化方式
      sparkConf.set("spark.serializer "," org.apache.spark.serializer.KryoSerializer")
      sparkConf.registerKryoClasses(Array(classOf[Order]))

第六步:集成SparkSQL分析基于窗口Window的每十秒的各个省份的订单量

OrderQuantityStreaming.scala

  import java.text.SimpleDateFormat
  import java.util.Date

  import com.fasterxml.jackson.databind.ObjectMapper
  import com.fasterxml.jackson.module.scala.DefaultScalaModule
  import kafka.serializer.StringDecoder
  import org.apache.spark.sql.SparkSession
  import org.apache.spark.streaming.kafka.KafkaUtils
  import org.apache.spark.{SparkConf, SparkContext}
  import org.apache.spark.streaming.{Duration, Minutes, Seconds, StreamingContext}

  //从kafka获取数据,统计订单量和订单总值
  object OrderQuantityStreaming {

    System.setProperty("HADOOP_USER_NAME", "bigdata")

    val BATCH_INTERVAL: Duration = Seconds(2)
    val WINDOW_INTERVAL:Duration = BATCH_INTERVAL*5
    val SLIDER_INTERVAL:Duration = BATCH_INTERVAL*3

    //检查点目录
    val CHECKPOINT_DIRECTORY:String = "hdfs://bigdata02:9000/sparkckpt10282"

    //贷出函数
    def sparkOperation(args: Array[String])(operation: StreamingContext => Unit) = {
      //创建StreamingContext对象的函数
      def functionToCreateContext(): StreamingContext = {

        //创建配置对象
        val sparkConf = new SparkConf()
          //应用名称
          .setAppName("OrderTurnoverStreaming")
          //设置运行模式
          .setMaster("local[3]")

        //TODO:设置从kafka topic 中每秒钟获取每个分区数据最多的条目数
        sparkConf.set("spark.streaming.kafka.maxRatePerPartition","10000")

        //TODO:设置序列化方式
        sparkConf.set("spark.serializer "," org.apache.spark.serializer.KryoSerializer")
        sparkConf.registerKryoClasses(Array(classOf[Order]))

        //创建SparkContext上下文对象
        val sc = SparkContext.getOrCreate(sparkConf)
        //设置日志级别
        sc.setLogLevel("WARN")
        //创建ssc
        val ssc: StreamingContext =  new StreamingContext(sc, BATCH_INTERVAL)
        //调用用户函数
        operation(ssc)
        //确保交互式查询时候,数据不被删除
        ssc.remember(Minutes(1))
        //设置一下检查点目录
        ssc.checkpoint(CHECKPOINT_DIRECTORY)
        //返回ssc对象
        ssc

      }

      var context:StreamingContext = null
      try{
        //stop any esxiting StreamingContext
        val stopActiveContext = true
        if (stopActiveContext) {
          StreamingContext.getActive().foreach(_.stop(stopSparkContext = true))
        }

        // context = StreamingContext.getOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
        context = StreamingContext.getActiveOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)

        //设置日志级别 ,如果从检查点恢复创建StreamingContext,日志级别不生效,就重新设置
        context.sparkContext.setLogLevel("WARN")
        //start计算
        context.start()
        //wait 停止
        context.awaitTermination()
      }catch {
        case e: Exception => e.printStackTrace()
      } finally {
        context.stop(stopSparkContext = true,stopGracefully = true)
      }

    }

    def main(args: Array[String]): Unit = {
      sparkOperation(args)(processOrderData)
    }

    //用户函数   真正编写程序逻辑的地方
    def processOrderData(ssc: StreamingContext): Unit = {
      /**
       * 1.从Kafka topic 里面获取数据  direct模式
       */

      //设置kafka连接的相关信息(第一个参数)
      val kafkaParams: Map[String, String] = Map(
        "metadata.broker.list" -> ConstantUtils.METADATA_BROKER_LIST,
        "auto.offset.reset" -> ConstantUtils.AUTO_OFFSET_RESET
      )
      //Topics(第二参数)
      val topicsSet = ConstantUtils.ORDER_TOPIC.split(",").toSet

      //采用直接方式从kafka topic里面读取数据
      val orderDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

      //需求二开始
      /**
       * 统计每10秒钟各个省份的订单量
       */
      orderDStream
        //设置窗口长度,滑动距离
        .window(WINDOW_INTERVAL,SLIDER_INTERVAL)
        .foreachRDD((rdd,time)=>{
          //格式化时间
          val sliderInterval = new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss").format(new Date(time.milliseconds))
          println("--------------------------------------")
          println(s"sliderInterval :${sliderInterval}")
          println("--------------------------------------")
          //判断一下RDD是否有数据
          if(!rdd.isEmpty()){
            val orderRDD = rdd.map(tuple => {
              //解析成Json格式数据
              ObjectMapperSingleton.getInstance().readValue(tuple._2, classOf[Order])
            })

            //创建SparkSession对象
            val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)

            //转换成DataFrame
            import spark.implicits._
            val orderDS = orderRDD.toDS()

            //使用DSL分析
            val provinceCountDF = orderDS.groupBy("provinceId").count()

            //打印
            provinceCountDF.show(15,truncate=false)

          }
        })

    }

  }

  //创建ObjectMapper单例对象
  object ObjectMapperSingleton {
    /**
     * 声明对象是使用transient 有如下含义
     * 1.当对象被序列化的时候,transient 阻止实例中哪些关键字声明的变量持久化
     * 2.当对象被反序列化的时候,实例变量不会被持久化和恢复
     */

    @transient private var instance: ObjectMapper = _

    def getInstance(): ObjectMapper = {
      if (instance == null) {
        instance = new ObjectMapper()
        instance.registerModule(DefaultScalaModule)
      }
      instance
    }


  }

  //创建Sparksession单例对象
  object SparkSessionSingleton {
    @transient private var instance: SparkSession = _

    def getInstance(sparkconf:SparkConf): SparkSession = {
      if (instance == null) {
        instance = SparkSession.builder().config(sparkconf).getOrCreate()
      }
      instance
    }
  }

作者:羋虹光

原文链接:https://www.jianshu.com/p/f04e4eaba44c

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐