Spark streaming 输出数据到redis
上次使用了spark streaming读取redis中的数据并进行处理。这次解决一下输出的问题。最简单的输出方式是使用计算的结果对象上自带的print函数,输出到运行的屏幕上。但是我运行的时候使用的是远程的分布式环境。然后用程序推送运行的。因此需要一个比较方便查看运行状况的方式。
尽管spark streaming 额外还提供了一些像saveAsTextFiles
这种保存数据为文件或者其他东西的方式,但没有想象中方便。
除了这些之外,spark streaming提供了一个可以用来自定义输出方式的函数foreachRDD。
出于使用简便的目的考虑。我决定使用redis的发布订阅机制,把数据推送到redis,由redis对外广播,后续用其他程序读取这个数据。也就是类似总线的机制。如果这里使用的是kafka这种重量级选手的话,它对数据做持久化之后可以让数据被随时拿出,不像redis的发布订阅只能即时性的使用。不过只看自定义输出这一部分的话,操作原理是一致的。
foreachRDD是基于rdd的一个操作,行为表现与我们在计算操作的部分基本是一致的,这也意味着一个很经典的情况:rdd是分布式的运算。它需要在在不同的机器上运行相同的程序。假设我们给这个函数传递的函数里直接创建redis连接的话,会出现一个序列化错误,因为它需要把这个连接对象序列化之后发送给计算程序,这点无法实现。
同样值得注意的是,如果我们在对rdd中的每个计算结果创建redis连接对象也是不推荐的。虽然这样不会产生先前说的那种序列化的问题,但是每一个记录都建立连接对象这会产生大量的冗余的开销。
合适的做法是利用rdd的分布式特性,rdd使用的也是经典的分区计算模型,每个节点计算的部分数据处于分区中,一般来讲,对于这一个分区的数据,我们建立一个连接对象已经是可以接受的了。我们可以像下面这样完成到redis的数据发送。
behaviorCount.foreachRDD { (rdd) => rdd.foreachPartition { partitionOfRecords => val jedis = new Jedis("master", 47777) partitionOfRecords.foreach(it => { jedis.publish("channel", data string of it) }) jedis.close() } } 复制代码
如果追求更近一步效率,还可以使用连接池技术,从连接池中获取连接,进一步降低开销。
还有一个问题则是如何整合数据,分辨数据的计算时间点。这里也可以利用foreachRDD函数。因为它实际上可以有两个参数,在rdd之后还可以接一个表示时间戳的变量,然后同样发送到redis,让其他程序来处理。
作者:纯人工模式混淆
链接:https://juejin.cn/post/7031197529500991518