基于redis实现的轻量级延迟队列
👉 这是一个或许对你有用的社群
🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入「芋道快速开发平台」知识星球。下面是星球提供的部分资料:
《项目实战(视频)》:从书中学,往事上“练” 《互联网高频面试题》:面朝简历学习,春暖花开 《架构 x 系统设计》:摧枯拉朽,掌控面试高频场景题 《精进 Java 学习指南》:系统学习,互联网主流技术栈 《必读 Java 源码专栏》:知其然,知其所以然
👉这是一个或许对你有用的开源项目
国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。
功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号、CRM 等等功能:
Boot 仓库:https://gitee.com/zhijiantianya/ruoyi-vue-pro Cloud 仓库:https://gitee.com/zhijiantianya/yudao-cloud 视频教程:https://doc.iocoder.cn 【国内首批】支持 JDK 21 + SpringBoot 3.2.2、JDK 8 + Spring Boot 2.7.18 双版本
延迟队列的工作原理 延迟队列的使用场景 延迟队列常见的实现方式 JDK 自带的延迟队列 DelayQueue MQ 实现方式 Redis 实现方式 lmstfy lmstfy特点 lmstfy工作原理 lmstfy服务端安装部署
什么是延迟队列
延迟队列 (Delay Queue)是一种存储消息并在特定延迟时间后将其投递到消费者的队列机制。在传统消息队列中,消息会立即被推送给消费者进行处理,但在某些场景下,我们希望消息在一段时间后再被消费者处理。
延迟队列的工作原理
延迟队列的工作原理通常是将消息先存储在队列中,消息的投递时间会被延迟,直到延迟时间到达时才将消息投递给消费者进行处理。这种机制保证了消息能够在指定的延迟时间后才会被处理,实现了消息的延迟投递。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/ruoyi-vue-pro 视频教程:https://doc.iocoder.cn/video/
延迟队列的使用场景
电商平台,超过 30 分钟未支付的订单,将会被取消 商品签收后,3天未确认,自动确认 在平台注册但 30 天内未登录的用户,发短信提醒 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议 商品抢购,预定后,开售前30分钟提醒
场景很多,就不一一列举,这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务。这些就可以使用延迟队列来解决
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/yudao-cloud 视频教程:https://doc.iocoder.cn/video/
延迟队列常见的实现方式
通过程序的方式实现,例如 [JDK] 自带的延迟队列 DelayQueue 通过 [MQ] 框架来实现,例如 [RabbitMQ] 可以通过 rabbitmq-delayed-message-exchange 插件来实现延迟队列 通过 Redis 的方式来实现延迟队列
JDK 自带的延迟队列 DelayQueue
优点
开发比较方便,可以直接在代码中使用 代码实现比较简单
缺点
不支持持久化保存 不支持分布式系统
MQ 实现方式
[RabbitMQ]本身并不支持延迟队列,但可以通过添加插件 rabbitmq-delayed-message-exchange 来实现延迟队列。
优点
支持分布式 支持持久化
缺点
框架比较重,需要搭建和配置 MQ。
Redis 实现方式
[Redis] 是通过有序集合(ZSet)的方式来实现延迟消息队列的,ZSet 有一个 Score 属性可以用来存储延迟执行的时间。
优点
灵活方便,[Redis]是互联网公司的标配,无序额外搭建相关环境; 可进行消息持久化,大大提高了延迟队列的可靠性; 分布式支持,不像 JDK 自身的 DelayQueue; 高可用性,利用 Redis 本身高可用方案,增加了系统健壮性。
缺点
需要使用无限循环的方式来执行任务检查,会消耗少量的系统资源。
综合以上,我们决定使用redis去实现延迟队列,刚好有个开源项目lmstfy基于redis实现的,下面介绍下这个项目的使用
lmstfy
美图开源的 lmstfy(https://github.com/bitleak/lmstfy)(let me schedule task for you),基于 redis 存储,使用 golang 开发,资源占用少,轻量级,并且经受美图线上环境大流量验证多年,较适合用来做延迟队列。
lmstfy特点
支持基本的消息队列特性:发布消息,消费消息,删除消息; 支持以下额外特性 支持消息有效期,过期自动删除; 支持消息延迟消费; 支持消息自动重试 支持死信队列 支持命名空间,队列级别的[prometheus] 监控,提供grafana dashboard; 支持发布,消费流量控制。
lmstfy工作原理
如上图所示:
消息经过生产者发布到lmstfy,如果delay时间大于0,则消息被扔到[redis]的ZSet中(score为延迟绝对时间),等延迟时间到了,才会挪到就绪队列;如果delay为0,则消息直接到就绪队列。 消费者每次都是从就绪队列消费消息; 服务器内部会有Timer,每隔一秒检查ZSet中是否有到期的消息,有则将消息移到就绪队列供消费者消费; 每一个消息发布的时候,可以设置最多被消费的次数,如果达到最大次数都还未正确处理(delete或者ack),则消息会被挪到死信队列; 业务可以选择复活死信队列中的消息,也可以选择直接删除死信队列中的消息;被复活的死信消息会再次挪到就绪队列中,可以被消费者正常消息。
lmstfy服务端安装部署
前提
lmstfy是依赖[redis]实现的延迟队列,所以首先要安装redis,redis的安装教程可以谷歌搜一搜, 只需要注意一点,redis的持久化,使用AOF 即可,淘汰策略使用noeviction 不淘汰任何数据,当内存不足时,新增操作会报错
reids的配置:
# 持久化设置为aof
appendonly yes
# 内存淘汰策略设置为 noeviction
maxmemory-policy noeviction
编译二进制文件
#下载代码
git clone https://github.com/bitleak/lmstfy.git
# 进入项目目录
cd lmstfy
# 使用make编译项目,当前目录会生成一个二进制文件,文件所在目录./_build
make```
### lmstfy的配置文件
```Host = "127.0.0.1"
Port = 7776
AdminHost = "127.0.0.1" # optional, default to localhost
AdminPort = 7778
#LogDir = "/var/log/lmstfy"
LogLevel = "info"
#LogFormat = "text" # Use LogFormat="json" if wants to print the log with json format
EnableAccessLog = true
## [default params](https://mp.weixin.qq.com/s/HNNA9lr7eWsmMfw23PwGIQ)
#TTLSecond = 24 * 60 * 60 // 1 day
#DelaySecond = 0
#TriesNum = 1
#TTRSecond = 2 * 60 // 2 minutes
#TimeoutSecond = 0 // means non-blocking
## [basic auth accounts for the admin api](https://mp.weixin.qq.com/s/HNNA9lr7eWsmMfw23PwGIQ)
[Accounts]
test_user = "change.me"
[AdminRedis] # redis used to store admin data, eg. tokens
Addr = "localhost:6379"
## [Password = foobared](https://mp.weixin.qq.com/s/HNNA9lr7eWsmMfw23PwGIQ)
[Pool]
[Pool.default]
Addr = "localhost:6379"
## [Password = foobared](https://mp.weixin.qq.com/s/HNNA9lr7eWsmMfw23PwGIQ)
## [DB = 0](https://mp.weixin.qq.com/s/HNNA9lr7eWsmMfw23PwGIQ)
#MigrateTo = "migrate" # When migration is enabled, all PUBLISH will go to `migrate` pool. and `default` will be drained
#[Pool.migrate]
#Addr = "localhost:6389"
#[Pool.mysentinel]
## [Addr = "localhost:16379,localhost:6380,localhost:6381"](https://mp.weixin.qq.com/s/HNNA9lr7eWsmMfw23PwGIQ)
## [MasterName = "mymaster"](https://mp.weixin.qq.com/s/HNNA9lr7eWsmMfw23PwGIQ)
## [Password = foobared](https://mp.weixin.qq.com/s/HNNA9lr7eWsmMfw23PwGIQ)
启动lmsfty
_build/lmstfy-server -c config/demo-conf.toml
以上步骤lmsfty服务器端就启动完成了,那客户端怎么使用呢?
lmstfy客户端
申请token
imstfy使用namespace做业务隔离,每个业务一个[token] ,需要向服务端申请,如果服务端开启了basic auth accounts验证,调用接口的时候 需要传入配置文件的账号,例如上面的配置,调用的时候需要传入”change.me“
例子:namespace为kb-test,申请[token]
curl --location 'http://127.0.0.1:7778/token/kb-test' \
--header 'Content-Type: application/x-www-form-urlencoded' \
--header 'Authorization: Basic dGVzdF91c2VyOmNoYW5nZS5tZQ==' \
--data-urlencode 'description=test'
postman调用
结果 :
{
"token": "01HQJ7EQBDZT88JH79BDE4FAYC"
}
官方目前支持的语言有 go、[Java] 、php、Rust,我这里使用 go 简单演示下如果使用。
生产者
引入github.com/bitleak/lmstfy/client这个包,代码如下:
package main
import (
"fmt"
"github.com/bitleak/lmstfy/client"
)
func main() {
c := client.NewLmstfyClient("127.0.0.1", 7776, "kb-test", "01HQJ7EQBDZT88JH79BDE4FAYC")
c.ConfigRetry(3, 50) // optional, config the client to retry when some errors happened. retry 3 times with 50ms interval
//Publish a job with ttl==forever, tries==3, delay==5s ,注意ttl设置的时间一定要大于delay
jobId, err := c.Publish("test", []byte("test"), 100, 3, 30)
if err == nil {
fmt.Println("消息发送成功", jobId)
}
}
消费者
package main
import (
"fmt"
"github.com/bitleak/lmstfy/client"
)
func main() {
c := client.NewLmstfyClient("127.0.0.1", 7776, "kb-test", "01HQJ7EQBDZT88JH79BDE4FAYC")
c.ConfigRetry(3, 50) // optional, config the client to retry when some errors happened. retry 3 times with 50ms interval
for {
job, err := c.Consume("test", 6, 3)
if err != nil {
panic(err)
}
if job != nil {
fmt.Println(string(job.Data))
//接收到消息,成功处理了, 对消息进行ack去人
err1 := c.Ack("test", job.ID)
if err1 == nil {
fmt.Println("消息出来成功并且确认")
}
}
}
}
以上 就是 lmstfy 的介绍以及使用,简单比较轻量,其他语言,可以看 github 上的示例。
欢迎加入我的知识星球,全面提升技术能力。
👉 加入方式,“长按”或“扫描”下方二维码噢:
星球的内容包括:项目实战、面试招聘、源码解析、学习路线。
文章有帮助的话,在看,转发吧。
谢谢支持哟 (*^__^*)
微信扫码关注该文公众号作者