阅读 193

Spark streaming 输出数据到redis

上次使用了spark streaming读取redis中的数据并进行处理。这次解决一下输出的问题。最简单的输出方式是使用计算的结果对象上自带的print函数,输出到运行的屏幕上。但是我运行的时候使用的是远程的分布式环境。然后用程序推送运行的。因此需要一个比较方便查看运行状况的方式。

尽管spark streaming 额外还提供了一些像saveAsTextFiles这种保存数据为文件或者其他东西的方式,但没有想象中方便。

除了这些之外,spark streaming提供了一个可以用来自定义输出方式的函数foreachRDD。

出于使用简便的目的考虑。我决定使用redis的发布订阅机制,把数据推送到redis,由redis对外广播,后续用其他程序读取这个数据。也就是类似总线的机制。如果这里使用的是kafka这种重量级选手的话,它对数据做持久化之后可以让数据被随时拿出,不像redis的发布订阅只能即时性的使用。不过只看自定义输出这一部分的话,操作原理是一致的。

foreachRDD是基于rdd的一个操作,行为表现与我们在计算操作的部分基本是一致的,这也意味着一个很经典的情况:rdd是分布式的运算。它需要在在不同的机器上运行相同的程序。假设我们给这个函数传递的函数里直接创建redis连接的话,会出现一个序列化错误,因为它需要把这个连接对象序列化之后发送给计算程序,这点无法实现。

同样值得注意的是,如果我们在对rdd中的每个计算结果创建redis连接对象也是不推荐的。虽然这样不会产生先前说的那种序列化的问题,但是每一个记录都建立连接对象这会产生大量的冗余的开销。

合适的做法是利用rdd的分布式特性,rdd使用的也是经典的分区计算模型,每个节点计算的部分数据处于分区中,一般来讲,对于这一个分区的数据,我们建立一个连接对象已经是可以接受的了。我们可以像下面这样完成到redis的数据发送。

behaviorCount.foreachRDD { (rdd) =>     rdd.foreachPartition { partitionOfRecords =>         val jedis = new Jedis("master", 47777)         partitionOfRecords.foreach(it => {             jedis.publish("channel", data string of it)         })         jedis.close()     } } 复制代码

如果追求更近一步效率,还可以使用连接池技术,从连接池中获取连接,进一步降低开销。

还有一个问题则是如何整合数据,分辨数据的计算时间点。这里也可以利用foreachRDD函数。因为它实际上可以有两个参数,在rdd之后还可以接一个表示时间戳的变量,然后同样发送到redis,让其他程序来处理。


作者:纯人工模式混淆
链接:https://juejin.cn/post/7031197529500991518


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐