阅读 163

spark streaming 使用实例:用户行为统计

前面我们已经做好了数据的输入以及数据的输出操作了,现在我们可以开始使用开始 spark streaming 的一些简单的使用了。这次完成的处理是对一个用户行为数据流来做分析。

数据源的数据原本是一个购物网站的数据集,被我做成了重复利用的数据(循环的读取),它共有五个字段。不过这次用到的只有第 3 个字段(代表用户的行为分类)。这个字段的取值范围是 1 到 4 。他们分别代表点击、收藏、加入购物车以及购买这四种行为。

我们每5秒统计一次数据,为此我们先建立一个相应的对象

val ssc = new StreamingContext(context, Seconds(5)) 复制代码

其中 context 是一个 SparkContext 对象,另一个参数 5s 则是我们的微批的时间间隔。

然后我们从数据源中读取数据,前面我们已经实现过自己的数据源了,这里我们就直接使用,其他数据源也基本同理。

val customReceiverStream = ssc.receiverStream(new CustomReceiver(sys.env("BUSHOST"),sys.env("BUSPORT").toInt )) 复制代码

这里使用了环境变量来读取服务器的地址,使用命令行传参也可行。主要是为了灵活性。至于谁好谁坏就见仁见智了。

现在开始计算的过程。假设我们把输入的数据映射成这样的结构: ( 用户行为, 1 ) ,这样一来, 这里用户行为的统计其实就是典型的group by 然后组内求和的过程。或者说其实就是经典的单词统计模型。

具体代码如下:

val userBehavior = customReceiverStream     .map(str => (str.split(",")(2), 1)) //先前说过的只取第三个值 val behaviorCount = userBehavior.reduceByKey(_ + _) 复制代码

有必要的话可能还需要和事实表关联,也就是把用户行为的种类转换到具体的行为,这里我们就不直接关联了,我们用模式匹配在最后输出操作的时候转换,只模拟下效果。

behaviorCount.foreachRDD { (rdd, time) =>     rdd.foreachPartition { partitionOfRecords =>         partitionOfRecords.foreach(it => {                                 val behavior_type = it._1 match { // fake join                                    case "1" => "click"                                    case "2" => "favorite"                                    case "3" => "cart"                                    case "4" => "buy"                                 }                                //这里输出数据         })     } } 复制代码

如果是redis的话,就可以像前面写的那样处理。 最后程序要运行还需要启动StreamingContext对象

ssc.start() ssc.awaitTermination() 复制代码

运行之后,我们可以在相应的输出流中进行查看。程序实际上比较简单,主要原因是spark steaming 为我们屏蔽了细节,上面的程序中我们的操作就是时间间隔之内的数据计算,它并不需要我们关注更多无关的部分。

这里只是最基础的内容,他还有更多更强大的功能,我们留到后续再介绍。


作者:纯人工模式混淆
链接:https://juejin.cn/post/7031565536278347807

文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐