目录

RabbitMQ四五事之死信队列

业务需求

有时候我们需要某些任务定时执行,譬如取消订单,5分钟没支付,这个订单就被取消。简单实现的话,我们可以使用Redis或Linux的crontab来实现,而对于RabbitMQ,我们则可以用它的死信队列来实现定时任务。

DLX

RabbitMQ 中有一种交换器叫 DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列。
消息变成死信一般是以下几种情况:

  • 消息被拒绝,并且设置 requeue 参数为 false
  • 消息过期
  • 队列达到最大长度

DLX其实就是一个普通的交换器,要使用它也很简单,就是在声明某个队列的时候设置其 deadLetterExchangedeadLetterRoutingKey 参数,deadLetterRoutingKey 参数可选,表示为 DLX 指定的路由键,如果没有特殊指定,则使用原队列的路由键。这样设置后,这个队列的消息一过期,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

简单例子

用之前文章RabbitMQ二三事快速启动RabbitMQ的服务,再把RabbitMQ三四事的代码改造下。
producer.js

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
const config = require("./config");
const amqp = require('amqplib');

async function getMQConnection() {
    return await amqp.connect({
        protocol: 'amqp',
        hostname: config.host,
        port: config.port,
        username: config.user,
        password: config.pass,
        locale: 'en_US',
        frameMax: 0,
        heartbeat: 5, // 心跳
        vhost: config.vhost,
    })
}

async function run(rmqConn, msgObj) {
    const noramlQueue = 'noramlQu';
    const noramlExchange = 'noramlEx';
    const exchangeDLX = 'testExDLX';
    const queueDLX = 'testQueueDLX';
    const routingKeyDLX = 'testRoutingKeyDLX';

    try {
        const channel = await rmqConn.createChannel();

        // 声明死信交换器和队列,就跟普通的一样
        await channel.assertExchange(exchangeDLX, 'direct', { durable: true, autoDelete: false });
        await channel.assertQueue(queueDLX, {durable: true, autoDelete: false, });
        await channel.bindQueue(queueDLX, exchangeDLX, routingKeyDLX);


        // 普通交换器和队列
        await channel.assertExchange(noramlExchange, 'direct', { durable: true, autoDelete: false })
        await channel.assertQueue(noramlQueue, {durable: true, autoDelete: false,
            deadLetterExchange: exchangeDLX,
            deadLetterRoutingKey: routingKeyDLX,
        });  // 队列设置DLX
        await channel.bindQueue(noramlQueue, noramlExchange, noramlQueue);

        // 发送消息
        await channel.publish(noramlExchange, noramlQueue, Buffer.from(msgObj.content), {
            expiration: msgObj.expiration, // 过期时间,ms
            persistent: true, 
            mandatory: true,
        });
        console.log('send message successfully.')
        await channel.close();
    } catch(err) {
        console.log('send message failed:' + err.message)
    }
}

async function testSend() {
    const conn = await getMQConnection()
    await run(conn, {
        content: (new Date()).toLocaleString(),
        expiration: '3000',
    })
    await conn.close()
}
testSend();

consumer.js

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
const config = require("./config");
const amqp = require('amqplib');

async function getMQConnection() {
    return await amqp.connect({
        protocol: 'amqp',
        hostname: config.host,
        port: config.port,
        username: config.user,
        password: config.pass,
        locale: 'en_US',
        frameMax: 0,
        heartbeat: 5, // 心跳
        vhost: config.vhost,
    })
}

async function run(rmqConn) {
    const noramlQueue = 'noramlQu';
    const noramlExchange = 'noramlEx';
    const exchangeDLX = 'testExDLX';
    const queueDLX = 'testQueueDLX';
    const routingKeyDLX = 'testRoutingKeyDLX';

    try {
        const channel = await rmqConn.createChannel();

        // 声明死信交换器和队列,就跟普通的一样
        await channel.assertExchange(exchangeDLX, 'direct', { durable: true, autoDelete: false });
        await channel.assertQueue(queueDLX, {durable: true, autoDelete: false, });
        await channel.bindQueue(queueDLX, exchangeDLX, routingKeyDLX);

        // 处理死信队列消息
        await channel.consume(queueDLX, msg => {
            console.log(`[${(new Date()).toLocaleString()}] consumer msg:`, msg.content.toString());
        }, { noAck: true });
    } catch(err) {
        console.log('consume message failed:' + err.message)
    }
}

async function testConsume() {
    const conn = await getMQConnection();
    console.log('begin consuming messages...');
    await run(conn);

    process.on('SIGINT', () => {
        console.log('stop consumer.')
        conn.close();
    });
}
testConsume();

config.js

1
2
3
4
5
6
7
module.exports = {
    host: '127.0.0.1',
    port: 5672,
    user: 'test',
    pass: '************',
    vhost: '/'
}

上面的代码,我们让消息3s后过期,先启动消费者,再启动生产者,我们可以看到消息3s后过期:

1
2
3
$ node consumer.js 
begin consuming messages...
[2021/3/20 下午3:22:29] consumer msg: 2021/3/20 下午3:22:26

死信队列问题

RabbitMQ中,每个消息的过期不是独立的,一个队列里的某个消息即使比同队列中的其它消息提前过期,也不会优先进入到死信队列,只有当过期的消息到了队列的顶端,才会被真正的丢弃或者进入死信队列
我们把生产者的代码调整下,先发一个20s过期的消息,再发一个3s过期的消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
....
async function testSend() {
    const conn = await getMQConnection()
    await run(conn, {
        content: (new Date()).toLocaleString() + ' 20s过期 ',
        expiration: '20000',
    })
    await run(conn, {
        content: (new Date()).toLocaleString() + ' 3s过期 ',
        expiration: '3000',
    })
    await conn.close()
}

观察消费者输出:

1
2
3
4
$ node consumer.js 
begin consuming messages...
[2021/3/21 下午6:10:21] consumer msg: 2021/3/21 下午6:10:01 20s过期 
[2021/3/21 下午6:10:21] consumer msg: 2021/3/21 下午6:10:01 3s过期

可以发现,3s过期的消息并没有先被消费,而是只能前面的20s过期的消息先过期,它才会被检查是否过期。
究其本质的话,RabbitMQ 的队列是一个 FIFO 的有序队列,投入的消息都顺序的压进 MQ 中。而 RabbitMQ 也只会对队列顶端的消息进行超时判定,所以就出现了上述的情况。
所以对于固定时间的延时任务的话,例如下单后半小时未支付则关闭订单这种场景,RabbitMQ无疑可以很好的承担起这个需求,但对于需要每条消息的死亡相互独立这种场景,RabbitMQ还是无法满足的。

解决队列消息非异步

RabbitMQ 本身没有这种功能,但是它有个插件可以解决这个问题:rabbitmq_delayed_message_exchange地址
这个插件的介绍如下:

A user can declare an exchange with the type x-delayed-message and then publish messages with the custom header x-delay expressing in milliseconds a delay time for the message. The message will be delivered to the respective queues after x-delay milliseconds.

这个插件增加了一种新类型的exchangex-delayed-message,然后只要发送消息时指定的是这个交换机,那么只需要在消息 header 中指定参数 x-delay [:毫秒值] 就能够实现每条消息的异步延时。

添加插件

用了Docker之后,添加这个插件非常简单,添加Dockerfile

1
2
3
FROM rabbitmq:3.8.12-management
COPY ./rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez /plugins
RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange

插件可以在Release页下载。
docker-compose.yml改下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
version: "2"
services:
  mq:
    build: .
    restart: always
    mem_limit: 2g
    hostname: mq1
    volumes:
      - ./mnesia:/var/lib/rabbitmq/mnesia
      - ./log:/var/log/rabbitmq
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
    ports:
      - "55672:15672"
      - "5672:5672"
    environment:
      - CONTAINER_NAME=rabbitMQ
      - RABBITMQ_ERLANG_COOKIE=3t182q3wtj1p9z0kd3tb

这样插件就安装成功了。

修改代码

producer.js

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
const config = require("./config");
const amqp = require('amqplib');

async function getMQConnection() {
    return await amqp.connect({
        protocol: 'amqp',
        hostname: config.host,
        port: config.port,
        username: config.user,
        password: config.pass,
        locale: 'en_US',
        frameMax: 0,
        heartbeat: 5, // 心跳
        vhost: config.vhost,
    })
}

async function run(rmqConn, msgObj) {
    const exchangeDelay = 'testExNewDelay';
    const queueDLX = 'testQueueDLX';
    const routingKeyDLX = 'testRoutingKeyDLX';

    try {
        const channel = await rmqConn.createChannel();

        // x-delayed-message类型的exchange
        await channel.assertExchange(exchangeDelay, 'x-delayed-message', { durable: true, autoDelete: false, arguments: {'x-delayed-type':  "direct"} });
        await channel.assertQueue(queueDLX, {durable: true, autoDelete: false, });
        await channel.bindQueue(queueDLX, exchangeDelay, routingKeyDLX);

        // 发送消息
        await channel.publish(exchangeDelay, routingKeyDLX, Buffer.from(msgObj.content), {
            headers: {"x-delay": msgObj.expiration}, // ms
            persistent: true,
            mandatory: true,
        });
        console.log('send message successfully.')
        await channel.close();
    } catch(err) {
        console.log('send message failed:' + err.message)
    }
}

async function testSend() {
    const conn = await getMQConnection()
    await run(conn, {
        content: (new Date()).toLocaleString() + ' 20s过期 ',
        expiration: '20000',
    })
    await run(conn, {
        content: (new Date()).toLocaleString() + ' 3s过期 ',
        expiration: '3000',
    })
    await conn.close()
}
testSend();

x-delayed-type告诉插件在给定的延迟时间过去之后,exchange应该跟directfanouttopic中的exchange路由功能一样。

consumer.js

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
const config = require("./config");
const amqp = require('amqplib');

async function getMQConnection() {
    return await amqp.connect({
        protocol: 'amqp',
        hostname: config.host,
        port: config.port,
        username: config.user,
        password: config.pass,
        locale: 'en_US',
        frameMax: 0,
        heartbeat: 5, // 心跳
        vhost: config.vhost,
    })
}

async function run(rmqConn) {
    const exchangeDelay = 'testExNewDelay';
    const queueDLX = 'testQueueDLX';
    const routingKeyDLX = 'testRoutingKeyDLX';

    try {
        const channel = await rmqConn.createChannel();

        // x-delayed-message类型的exchange
        await channel.assertExchange(exchangeDelay, 'x-delayed-message', { durable: true, autoDelete: false, arguments: {'x-delayed-type':  "direct"} });
        await channel.assertQueue(queueDLX, {durable: true, autoDelete: false, });
        await channel.bindQueue(queueDLX, exchangeDelay, routingKeyDLX);

        // 处理死信队列消息
        await channel.consume(queueDLX, msg => {
            console.log(`[${(new Date()).toLocaleString()}] consumer msg:`, msg.content.toString());
        }, { noAck: true });
    } catch(err) {
        console.log('consume message failed:' + err.message)
    }
}

async function testConsume() {
    const conn = await getMQConnection();
    console.log('begin consuming messages...');
    await run(conn);

    process.on('SIGINT', () => {
        console.log('stop consumer.')
        conn.close();
    });
}

testConsume();

测试效果

执行生产者代码之后,我们可以看到消费者输出:

1
2
3
4
$ node consumer.js 
begin consuming messages...
[2021/3/22 下午2:16:37] consumer msg: 2021/3/22 下午2:16:34 3s过期 
[2021/3/22 下午2:16:54] consumer msg: 2021/3/22 下午2:16:34 20s过期

可以发现,消息已经独立的过期了。

局限性

没有什么东西是完美的,这个插件也不例外。看下这个插件的Performance Impact部分:

1
2
3
For each message that crosses an "x-delayed-message" exchange, 
the plugin will try to determine if the message has to be expired by making sure the delay is within range, 
ie: Delay > 0, Delay =< ?ERL_MAX_T (In Erlang a timer can be set up to (2^32)-1 milliseconds in the future).

延迟时间最大为 (2^32)-1 毫秒,大约 49 天。另外这个插件也不适合大量延迟消息(例如数十万或数百万)的场景,Limitations也写了:

1
2
Current design of this plugin doesn't really fit scenarios with a high number of delayed messages 
(e.g. 100s of thousands or millions). See #72 for details.

参考: