阅读 280

go-redis实现客户端消息发布与订阅

一. 前言

在数据量较小的情况下,可以使用Redis来实现消息的发布与订阅,来代替KafkaKafka对于数据量大的场景下性能卓越,但是对于如此小场景时候,不仅运维成本提升,还用不上多少性能。

不过使用Redis的另一个弊端是消息不能堆积,一旦消费者节点没有消费消息,消息将会丢失。因此需要评估当下场景来选择适合的架构。

此处使用go-redis来实现Redis的发布与订阅。

二. 官方文档

官方文档有较为完整的例子:

pubsub := rdb.Subscribe(ctx, "mychannel1") // Wait for confirmation that subscription is created before publishing anything. _, err := pubsub.Receive(ctx) if err != nil { panic(err) } // Go channel which receives messages. ch := pubsub.Channel() // Publish a message. err = rdb.Publish(ctx, "mychannel1", "hello").Err() if err != nil { panic(err) } time.AfterFunc(time.Second, func() { // When pubsub is closed channel is closed too. _ = pubsub.Close() }) // Consume messages. for msg := range ch { fmt.Println(msg.Channel, msg.Payload) } 复制代码

三. 代码实现

分步讲解下具体实现代码。

1. 连接redis

func redisConnect() (rdb *redis.Client) { var ( redisServer string port        string password    string ) redisServer = os.Getenv("RedisUrl") port = os.Getenv("RedisPort") password = os.Getenv("RedisPass") rdb = redis.NewClient(&redis.Options{ Addr:     redisServer + ":" + port, Password: password, DB:       0, // use default DB }) return } 复制代码

2. 发布消息

func pubMessage(channel, msg string) { rdb := redisConnect() rdb.Publish(context.Background(), channel, msg) } 复制代码

3. 订阅消息

func subMessage(channel string) { rdb := redisConnect() pubsub := rdb.Subscribe(context.Background(), channel) _, err := pubsub.Receive(context.Background()) if err != nil { panic(err) } ch := pubsub.Channel() for msg := range ch { fmt.Println(msg.Channel, msg.Payload) } } 复制代码

四. 完整案例

此处分为一个发布节点和一个订阅节点来实现了简单的发布与订阅。

1. 消息发布节点

package main import ( "context" "fmt" "os" "github.com/go-redis/redis/v8" ) func redisConnect() (rdb *redis.Client) { var ( redisServer string port        string password    string ) redisServer = os.Getenv("RedisUrl") port = os.Getenv("RedisPort") password = os.Getenv("RedisPass") rdb = redis.NewClient(&redis.Options{ Addr:     redisServer + ":" + port, Password: password, DB:       0, // use default DB }) return } func pubMessage(channel, msg string) { rdb := redisConnect() rdb.Publish(context.Background(), channel, msg) } func main() { channel := "hello" msgList := []string{"hello", "world"}   // 此处发了两个消息 for _, msg := range msgList { pubMessage(channel, msg) fmt.Printf("已经发送%s到%s\n", msg, channel) } } 复制代码

2. 消息订阅节点

package main import ( "context" "fmt" "os" "github.com/go-redis/redis/v8" ) func redisConnect() (rdb *redis.Client) { var ( redisServer string port        string password    string ) redisServer = os.Getenv("RedisUrl") port = os.Getenv("RedisPort") password = os.Getenv("RedisPass") rdb = redis.NewClient(&redis.Options{ Addr:     redisServer + ":" + port, Password: password, DB:       0, // use default DB }) return } func subMessage(channel string) { rdb := redisConnect() pubsub := rdb.Subscribe(context.Background(), channel) _, err := pubsub.Receive(context.Background()) if err != nil { panic(err) } ch := pubsub.Channel() for msg := range ch { fmt.Println(msg.Channel, msg.Payload) } } func main() {     channel := "hello"     subMessage(channel) } 复制代码

五. 运行结果

1. 消息发布节点输出

100434315-43e33e00-30d7-11eb-9c35-29fd0a4b9be1.png

2. 消息订阅节点输出

100434353-52c9f080-30d7-11eb-82e4-a99747775c93.png


作者:Kuari
链接:https://juejin.cn/post/7023271670018211877

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐