业务需求 有时候我们需要某些任务定时执行,譬如取消订单,5分钟没支付,这个订单就被取消。简单实现的话,我们可以使用Redis
或Linux的crontab 来实现,而对于RabbitMQ,我们则可以用它的死信队列
来实现定时任务。
DLX RabbitMQ 中有一种交换器叫 DLX,全称为 Dead-Letter-Exchange
,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列。 消息变成死信一般是以下几种情况:
消息被拒绝,并且设置 requeue 参数为 false
消息过期
队列达到最大长度
DLX其实就是一个普通的交换器,要使用它也很简单,就是在声明某个队列的时候设置其 deadLetterExchange
和 deadLetterRoutingKey
参数,deadLetterRoutingKey
参数可选,表示为 DLX 指定的路由键,如果没有特殊指定,则使用原队列的路由键。这样设置后,这个队列的消息一过期,RabbitMQ
就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。
简单例子 用之前文章RabbitMQ二三事 快速启动RabbitMQ的服务,再把RabbitMQ三四事 的代码改造下。producer.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 const config = require ("./config" );const amqp = require ('amqplib' );async function getMQConnection ( ) { return await amqp.connect({ protocol: 'amqp' , hostname: config.host, port: config.port, username: config.user, password: config.pass, locale: 'en_US' , frameMax: 0 , heartbeat: 5 , vhost: config.vhost, }) } async function run (rmqConn, msgObj ) { const noramlQueue = 'noramlQu' ; const noramlExchange = 'noramlEx' ; const exchangeDLX = 'testExDLX' ; const queueDLX = 'testQueueDLX' ; const routingKeyDLX = 'testRoutingKeyDLX' ; try { const channel = await rmqConn.createChannel(); await channel.assertExchange(exchangeDLX, 'direct' , { durable : true , autoDelete : false }); await channel.assertQueue(queueDLX, {durable : true , autoDelete : false , }); await channel.bindQueue(queueDLX, exchangeDLX, routingKeyDLX); await channel.assertExchange(noramlExchange, 'direct' , { durable : true , autoDelete : false }) await channel.assertQueue(noramlQueue, {durable : true , autoDelete : false , deadLetterExchange: exchangeDLX, deadLetterRoutingKey: routingKeyDLX, }); await channel.bindQueue(noramlQueue, noramlExchange, noramlQueue); await channel.publish(noramlExchange, noramlQueue, Buffer.from(msgObj.content), { expiration: msgObj.expiration, persistent: true , mandatory: true , }); console .log('send message successfully.' ) await channel.close(); } catch (err) { console .log('send message failed:' + err.message) } } async function testSend ( ) { const conn = await getMQConnection() await run(conn, { content: (new Date ()).toLocaleString(), expiration: '3000' , }) await conn.close() } testSend();
consumer.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 const config = require ("./config" );const amqp = require ('amqplib' );async function getMQConnection ( ) { return await amqp.connect({ protocol: 'amqp' , hostname: config.host, port: config.port, username: config.user, password: config.pass, locale: 'en_US' , frameMax: 0 , heartbeat: 5 , vhost: config.vhost, }) } async function run (rmqConn ) { const noramlQueue = 'noramlQu' ; const noramlExchange = 'noramlEx' ; const exchangeDLX = 'testExDLX' ; const queueDLX = 'testQueueDLX' ; const routingKeyDLX = 'testRoutingKeyDLX' ; try { const channel = await rmqConn.createChannel(); await channel.assertExchange(exchangeDLX, 'direct' , { durable : true , autoDelete : false }); await channel.assertQueue(queueDLX, {durable : true , autoDelete : false , }); await channel.bindQueue(queueDLX, exchangeDLX, routingKeyDLX); await channel.consume(queueDLX, msg => { console .log(`[${(new Date ()).toLocaleString()} ] consumer msg:` , msg.content.toString()); }, { noAck : true }); } catch (err) { console .log('consume message failed:' + err.message) } } async function testConsume ( ) { const conn = await getMQConnection(); console .log('begin consuming messages...' ); await run(conn); process.on('SIGINT' , () => { console .log('stop consumer.' ) conn.close(); }); } testConsume();
config.js
1 2 3 4 5 6 7 module.exports = { host: '127.0.0.1', port: 5672, user: 'test', pass: '************', vhost: '/' }
上面的代码,我们让消息3s后过期,先启动消费者 ,再启动生产者,我们可以看到消息3s后过期:
1 2 3 $ node consumer.js begin consuming messages... [2021/3/20 下午3:22:29] consumer msg: 2021/3/20 下午3:22:26
死信队列问题 RabbitMQ中,每个消息的过期不是独立的,一个队列里的某个消息即使比同队列中的其它消息提前过期,也不会优先进入到死信队列,只有当过期的消息到了队列的顶端,才会被真正的丢弃或者进入死信队列 。 我们把生产者 的代码调整下,先发一个20s过期的消息,再发一个3s过期的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 .... async function testSend() { const conn = await getMQConnection() await run(conn, { content: (new Date()).toLocaleString() + ' 20s过期 ', expiration: '20000', }) await run(conn, { content: (new Date()).toLocaleString() + ' 3s过期 ', expiration: '3000', }) await conn.close() }
观察消费者 输出:
1 2 3 4 $ node consumer.js begin consuming messages... [2021/3/21 下午6:10:21] consumer msg: 2021/3/21 下午6:10:01 20s过期 [2021/3/21 下午6:10:21] consumer msg: 2021/3/21 下午6:10:01 3s过期
可以发现,3s过期的消息并没有先被消费,而是只能前面的20s过期的消息先过期,它才会被检查是否过期。 究其本质的话,RabbitMQ 的队列是一个 FIFO
的有序队列,投入的消息都顺序的压进 MQ 中。而 RabbitMQ 也只会对队列顶端的消息进行超时判定,所以就出现了上述的情况。 所以对于固定时间的延时任务的话,例如下单后半小时未支付则关闭订单这种场景,RabbitMQ无疑可以很好的承担起这个需求,但对于需要每条消息的死亡相互独立这种场景,RabbitMQ还是无法满足的。
解决队列消息非异步 RabbitMQ 本身没有这种功能,但是它有个插件可以解决这个问题:rabbitmq_delayed_message_exchange ,地址 。 这个插件的介绍如下:
A user can declare an exchange with the type x-delayed-message and then publish messages with the custom header x-delay expressing in milliseconds a delay time for the message. The message will be delivered to the respective queues after x-delay milliseconds.
这个插件增加了一种新类型的exchange
:x-delayed-message ,然后只要发送消息时指定的是这个交换机,那么只需要在消息 header 中指定参数 x-delay [:毫秒值] 就能够实现每条消息的异步延时。
添加插件 用了Docker之后,添加这个插件非常简单,添加Dockerfile
:
1 2 3 FROM rabbitmq:3.8.12-management COPY ./rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez /plugins RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange
插件可以在Release页 下载。docker-compose.yml 改下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 version: "2" services: mq: build: . restart: always mem_limit: 2g hostname: mq1 volumes: - ./mnesia:/var/lib/rabbitmq/mnesia - ./log:/var/log/rabbitmq - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf ports: - "55672:15672" - "5672:5672" environment: - CONTAINER_NAME=rabbitMQ - RABBITMQ_ERLANG_COOKIE=3t182q3wtj1p9z0kd3tb
这样插件就安装成功了。
修改代码 producer.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 const config = require ("./config" );const amqp = require ('amqplib' );async function getMQConnection ( ) { return await amqp.connect({ protocol: 'amqp' , hostname: config.host, port: config.port, username: config.user, password: config.pass, locale: 'en_US' , frameMax: 0 , heartbeat: 5 , vhost: config.vhost, }) } async function run (rmqConn, msgObj ) { const exchangeDelay = 'testExNewDelay' ; const queueDLX = 'testQueueDLX' ; const routingKeyDLX = 'testRoutingKeyDLX' ; try { const channel = await rmqConn.createChannel(); await channel.assertExchange(exchangeDelay, 'x-delayed-message' , { durable : true , autoDelete : false , arguments : {'x-delayed-type' : "direct" } }); await channel.assertQueue(queueDLX, {durable : true , autoDelete : false , }); await channel.bindQueue(queueDLX, exchangeDelay, routingKeyDLX); await channel.publish(exchangeDelay, routingKeyDLX, Buffer.from(msgObj.content), { headers: {"x-delay" : msgObj.expiration}, persistent: true , mandatory: true , }); console .log('send message successfully.' ) await channel.close(); } catch (err) { console .log('send message failed:' + err.message) } } async function testSend ( ) { const conn = await getMQConnection() await run(conn, { content: (new Date ()).toLocaleString() + ' 20s过期 ' , expiration: '20000' , }) await run(conn, { content: (new Date ()).toLocaleString() + ' 3s过期 ' , expiration: '3000' , }) await conn.close() } testSend();
x-delayed-type
告诉插件在给定的延迟时间过去之后,exchange应该跟direct
,fanout
,topic
中的exchange路由功能一样。
consumer.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 const config = require ("./config" );const amqp = require ('amqplib' );async function getMQConnection ( ) { return await amqp.connect({ protocol: 'amqp' , hostname: config.host, port: config.port, username: config.user, password: config.pass, locale: 'en_US' , frameMax: 0 , heartbeat: 5 , vhost: config.vhost, }) } async function run (rmqConn ) { const exchangeDelay = 'testExNewDelay' ; const queueDLX = 'testQueueDLX' ; const routingKeyDLX = 'testRoutingKeyDLX' ; try { const channel = await rmqConn.createChannel(); await channel.assertExchange(exchangeDelay, 'x-delayed-message' , { durable : true , autoDelete : false , arguments : {'x-delayed-type' : "direct" } }); await channel.assertQueue(queueDLX, {durable : true , autoDelete : false , }); await channel.bindQueue(queueDLX, exchangeDelay, routingKeyDLX); await channel.consume(queueDLX, msg => { console .log(`[${(new Date ()).toLocaleString()} ] consumer msg:` , msg.content.toString()); }, { noAck : true }); } catch (err) { console .log('consume message failed:' + err.message) } } async function testConsume ( ) { const conn = await getMQConnection(); console .log('begin consuming messages...' ); await run(conn); process.on('SIGINT' , () => { console .log('stop consumer.' ) conn.close(); }); } testConsume();
测试效果 执行生产者 代码之后,我们可以看到消费者 输出:
1 2 3 4 $ node consumer.js begin consuming messages... [2021/3/22 下午2:16:37] consumer msg: 2021/3/22 下午2:16:34 3s过期 [2021/3/22 下午2:16:54] consumer msg: 2021/3/22 下午2:16:34 20s过期
可以发现,消息已经独立的过期了。
局限性 没有什么东西是完美的,这个插件也不例外。看下这个插件的Performance Impact
部分:
1 2 3 For each message that crosses an "x-delayed-message" exchange, the plugin will try to determine if the message has to be expired by making sure the delay is within range, ie: Delay > 0, Delay =< ?ERL_MAX_T (In Erlang a timer can be set up to (2^32)-1 milliseconds in the future).
延迟时间最大为 (2^32)-1 毫秒,大约 49 天。另外这个插件也不适合大量延迟消息(例如数十万或数百万)的场景,Limitations
也写了:
1 2 Current design of this plugin doesn't really fit scenarios with a high number of delayed messages (e.g. 100s of thousands or millions). See #72 for details.
参考: