消息队列(二):RabbitMQ 深入

写在前面

本文是消息队列系列的第二篇,深入 RabbitMQ 的架构、Exchange 路由、可靠性保证和 .NET 实战。前置知识:消息队列核心概念(第一篇)。


一、RabbitMQ 架构

1.1 整体架构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Producer → Exchange → Binding → Queue → Consumer
                     Routing Key

核心组件:
- Connection    — TCP 连接
- Channel       — 连接内的轻量级通道(一个连接多路复用)
- Virtual Host  — 逻辑隔离(类似数据库的 Schema)
- Exchange      — 消息路由器
- Queue         — 消息存储
- Binding       — Exchange 和 Queue 的绑定规则

1.2 消息流转过程

1
2
3
4
5
6
1. Producer 创建消息,指定 Exchange 和 Routing Key
2. Exchange 根据 Binding 规则路由到一个或多个 Queue
3. Queue 存储消息(如果持久化则写入磁盘)
4. Consumer 从 Queue 中消费消息
5. Consumer 处理完后发送 ACK
6. Queue 收到 ACK 后删除消息

1.3 Virtual Host

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
一个 RabbitMQ 实例可以有多个 VHost,互相隔离:
/         — 默认 VHost
/dev      — 开发环境
/staging  — 预发环境
/prod     — 生产环境

每个 VHost 有独立的:
- Exchange
- Queue
- Binding
- 用户权限

二、Exchange 详解

2.1 Direct Exchange

精确匹配 Routing Key。

1
2
3
4
Producer → [Exchange] → Queue "order-created"(key="order.created")
                        → Queue "order-cancelled"(key="order.cancelled")

只有 Routing Key 完全匹配才会路由到对应 Queue
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 声明 Exchange
channel.ExchangeDeclare("order.exchange", ExchangeType.Direct);

// 声明 Queue
channel.QueueDeclare("order-created", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare("order-cancelled", durable: true, exclusive: false, autoDelete: false, arguments: null);

// 绑定
channel.QueueBind("order-created", "order.exchange", "order.created");
channel.QueueBind("order-cancelled", "order.exchange", "order.cancelled");

// 发送
channel.BasicPublish("order.exchange", "order.created", null, body);

2.2 Fanout Exchange

忽略 Routing Key,广播到所有绑定的 Queue。

1
2
3
4
5
Producer → [Exchange] → Queue A(短信)
                        → Queue B(积分)
                        → Queue C(通知)

所有 Queue 都会收到消息,不管 Routing Key 是什么
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
channel.ExchangeDeclare("order.fanout", ExchangeType.Fanout);

channel.QueueDeclare("sms-queue", durable: true, exclusive: false, autoDelete: false, null);
channel.QueueDeclare("points-queue", durable: true, exclusive: false, autoDelete: false, null);
channel.QueueDeclare("notify-queue", durable: true, exclusive: false, autoDelete: false, null);

// Fanout 绑定不需要 routing key
channel.QueueBind("sms-queue", "order.fanout", "");
channel.QueueBind("points-queue", "order.fanout", "");
channel.QueueBind("notify-queue", "order.fanout", "");

2.3 Topic Exchange

通配符匹配 Routing Key。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Routing Key 格式:用 . 分隔的单词,如 order.created.payment

通配符:
  * 匹配一个单词
  # 匹配零或多个单词

示例:
  order.*       → 匹配 order.created, order.cancelled
  order.#       → 匹配 order.created, order.created.payment, order.cancelled.refund
  *.created     → 匹配 order.created, user.created
  #             → 匹配所有(等同于 Fanout)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
channel.ExchangeDeclare("order.topic", ExchangeType.Topic);

// 短信服务关心所有订单事件
channel.QueueBind("sms-queue", "order.topic", "order.*");

// 财务服务只关心支付相关
channel.QueueBind("finance-queue", "order.topic", "order.*.payment");

// 统计服务关心所有事件
channel.QueueBind("stats-queue", "order.topic", "#");

2.4 Headers Exchange

基于消息头匹配,忽略 Routing Key。

1
2
3
4
5
匹配模式:
- x-match=all    — 所有头部字段都匹配
- x-match=any    — 任一头部字段匹配

适用:复杂的多维度路由(实际使用较少,Topic 通常够用)

2.5 Exchange 类型选择

1
2
3
4
Direct   — 精确匹配,点对点或简单路由
Fanout   — 广播,所有消费者都要
Topic    — 模式匹配,灵活路由(最常用)
Headers  — 复杂条件匹配(性能差,少用)

三、消息可靠性保证

3.1 三个环节

1
2
3
生产端可靠性    — 确保消息成功到达 Broker
Broker 可靠性   — 确保消息在 Broker 中不丢
消费端可靠性    — 确保消息被正确处理

3.2 生产端:Publisher Confirm

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 开启 Confirm 模式
channel.ConfirmSelect();

// 发送消息
channel.BasicPublish("order.exchange", "order.created", null, body);

// 等待确认
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
// 成功 → 继续
// 超时 → 抛异常,可以重试

// 批量确认(性能更好)
for (int i = 0; i < 1000; i++)
{
    channel.BasicPublish("order.exchange", "order.created", null, bodies[i]);
}
channel.WaitForConfirms(); // 一次性确认所有

3.3 生产端:事务(不推荐)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 事务模式(性能差,不推荐)
channel.TxSelect();
try
{
    channel.BasicPublish("order.exchange", "order.created", null, body);
    channel.TxCommit();
}
catch
{
    channel.TxRollback();
}

// Confirm 模式比事务快 10 倍以上,优先用 Confirm

3.4 Broker 端:持久化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
三个层面都要持久化:

1. Exchange 持久化
   channel.ExchangeDeclare("order.exchange", ExchangeType.Direct, durable: true);

2. Queue 持久化
   channel.QueueDeclare("order-queue", durable: true, exclusive: false, autoDelete: false, null);

3. Message 持久化
   var props = channel.CreateBasicProperties();
   props.DeliveryMode = 2;  // 2 = 持久化
   channel.BasicPublish("order.exchange", "order.created", props, body);

注意:持久化影响性能,根据业务需求选择

3.5 消费端:手动 ACK

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 关闭自动确认
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    try
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);

        // 处理业务逻辑
        ProcessOrder(message);

        // 手动确认
        channel.BasicAck(ea.DeliveryTag, multiple: false);
    }
    catch (Exception ex)
    {
        // 处理失败
        // requeue=true → 重新入队(可能无限循环)
        // requeue=false → 丢弃或进死信
        channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
    }
};

// autoAck: false → 手动确认
channel.BasicConsume("order-queue", autoAck: false, consumer);

3.6 可靠性总结

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
消息不丢的完整配置:

生产端:
  ✓ Confirm 模式
  ✓ 持久化消息(DeliveryMode=2)
  ✓ 发送失败重试

Broker:
  ✓ Exchange 持久化(durable=true)
  ✓ Queue 持久化(durable=true)
  ✓ 队列镜像(HA Policy,集群模式)

消费端:
  ✓ 手动 ACK(autoAck=false)
  ✓ 处理完业务再 ACK
  ✓ 失败进死信队列

四、死信队列

4.1 死信产生条件

1
2
3
1. 消费者 nack/reject 且 requeue=false
2. 消息 TTL 过期
3. 队列达到最大长度

4.2 配置死信队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 业务队列:指定死信交换机
var args = new Dictionary<string, object>
{
    { "x-dead-letter-exchange", "order.dlx" },
    { "x-dead-letter-routing-key", "order.dead" }
};
channel.QueueDeclare("order-queue", durable: true, exclusive: false, autoDelete: false, args);

// 死信交换机和队列
channel.ExchangeDeclare("order.dlx", ExchangeType.Direct);
channel.QueueDeclare("order-dead-queue", durable: true, exclusive: false, autoDelete: false, null);
channel.QueueBind("order-dead-queue", "order.dlx", "order.dead");

4.3 死信处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 消费死信队列
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);

    // 记录失败原因
    var reason = ea.BasicProperties.Headers["x-death"] as List<object>;
    logger.LogError("消息进入死信:{Message}, 原因:{Reason}", message, reason);

    // 告警通知
    alertService.Notify($"消息处理失败:{message}");

    channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume("order-dead-queue", autoAck: false, consumer);

五、延迟消息

5.1 TTL + DLX 方案

1
2
3
4
5
6
7
8
9
原理:
  消息设置 TTL → 过期后进入死信队列 → 消费者消费死信队列

流程:
  Producer → Queue(TTL=30min, DLX=order.dlx)
                                    ↓ 过期
           order.dlx → dead-queue → Consumer

实现订单 30 分钟未支付自动取消
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 延迟队列:消息过期后进入死信
var delayArgs = new Dictionary<string, object>
{
    { "x-dead-letter-exchange", "order.dlx" },
    { "x-dead-letter-routing-key", "order.timeout" },
    { "x-message-ttl", 1800000 }  // 30 分钟(毫秒)
};
channel.QueueDeclare("order-wait-queue", durable: true, exclusive: false, autoDelete: false, delayArgs);

// 死信交换机和消费队列
channel.ExchangeDeclare("order.dlx", ExchangeType.Direct);
channel.QueueDeclare("order-timeout-queue", durable: true, exclusive: false, autoDelete: false, null);
channel.QueueBind("order-timeout-queue", "order.dlx", "order.timeout");

// 发送延迟消息
channel.BasicPublish("", "order-wait-queue", null, orderBody);
// 30 分钟后,消息过期进入 order-timeout-queue 被消费

5.2 延迟消息插件

1
2
# 安装 rabbitmq_delayed_message_exchange 插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 使用延迟插件(更灵活,支持每条消息不同延迟)
var args = new Dictionary<string, object>
{
    { "x-delayed-type", "direct" }
};
channel.ExchangeDeclare("order.delay", "x-delayed-message", durable: true, arguments: args);

// 发送时指定延迟时间
var props = channel.CreateBasicProperties();
props.Headers = new Dictionary<string, object>
{
    { "x-delay", 60000 }  // 延迟 60 秒
};
channel.BasicPublish("order.delay", "order.timeout", props, body);

六、.NET 实战

6.1 使用 RabbitMQ.Client

1
dotnet add package RabbitMQ.Client

6.2 封装 Producer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class RabbitMqProducer : IDisposable
{
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public RabbitMqProducer(string connectionString)
    {
        var factory = new ConnectionFactory
        {
            Uri = new Uri(connectionString),
            AutomaticRecoveryEnabled = true,     // 自动重连
            NetworkRecoveryInterval = TimeSpan.FromSeconds(5)
        };

        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        // 开启 Confirm 模式
        _channel.ConfirmSelect();
    }

    public void Publish(string exchange, string routingKey, string message)
    {
        var body = Encoding.UTF8.GetBytes(message);

        var props = _channel.CreateBasicProperties();
        props.DeliveryMode = 2;               // 持久化
        props.ContentType = "application/json";
        props.MessageId = Guid.NewGuid().ToString();

        _channel.BasicPublish(exchange, routingKey, props, body);
        _channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
    }

    public void Dispose()
    {
        _channel?.Dispose();
        _connection?.Dispose();
    }
}

6.3 封装 Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class RabbitMqConsumer : IDisposable
{
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public RabbitMqConsumer(string connectionString)
    {
        var factory = new ConnectionFactory
        {
            Uri = new Uri(connectionString),
            AutomaticRecoveryEnabled = true,
            DispatchConsumersAsync = true  // 异步消费
        };

        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        // QoS:每次只推 N 条消息(防止消费者积压)
        _channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);
    }

    public void Subscribe(string queue, Func<string, Task> onMessage)
    {
        var consumer = new AsyncEventingBasicConsumer(_channel);

        consumer.Received += async (model, ea) =>
        {
            try
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);

                await onMessage(message);

                _channel.BasicAck(ea.DeliveryTag, false);
            }
            catch (Exception ex)
            {
                // 重试次数判断
                var retryCount = GetRetryCount(ea);
                if (retryCount < 3)
                {
                    // 重新入队
                    _channel.BasicNack(ea.DeliveryTag, false, requeue: true);
                }
                else
                {
                    // 进入死信
                    _channel.BasicNack(ea.DeliveryTag, false, requeue: false);
                }
            }
        };

        _channel.BasicConsume(queue, autoAck: false, consumer);
    }

    private int GetRetryCount(BasicDeliverEventArgs ea)
    {
        if (ea.BasicProperties.Headers?.ContainsKey("x-death") == true)
        {
            var death = ea.BasicProperties.Headers["x-death"] as List<object>;
            return death?.Count ?? 0;
        }
        return 0;
    }

    public void Dispose()
    {
        _channel?.Dispose();
        _connection?.Dispose();
    }
}

6.4 使用 MassTransit(更高级的封装)

1
dotnet add package MassTransit.RabbitMQ
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 注册
builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<OrderCreatedConsumer>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        // 配置重试
        cfg.UseMessageRetry(r => r.Exponential(3,
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(30),
            TimeSpan.FromSeconds(5)));

        // 配置死信
        cfg.ConfigureEndpoints(context);
    });
});

// 消费者
public class OrderCreatedConsumer : IConsumer<OrderCreatedEvent>
{
    private readonly ILogger<OrderCreatedConsumer> _logger;

    public OrderCreatedConsumer(ILogger<OrderCreatedConsumer> logger) => _logger = logger;

    public async Task Consume(ConsumeContext<OrderCreatedEvent> context)
    {
        _logger.LogInformation("收到订单创建事件:{OrderId}", context.Message.OrderId);

        // 处理业务逻辑
        await SendSmsAsync(context.Message);
    }
}

// 生产者
public class OrderService
{
    private readonly IPublishEndpoint _publishEndpoint;

    public OrderService(IPublishEndpoint publishEndpoint) => _publishEndpoint = publishEndpoint;

    public async Task CreateOrderAsync(Order order)
    {
        // 保存订单...

        // 发布事件
        await _publishEndpoint.Publish(new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerEmail = order.CustomerEmail,
            Total = order.Total
        });
    }
}

七、集群和高可用

7.1 集群模式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
普通集群:
  - Queue 只存在于一个节点
  - 其他节点可以转发消费请求
  - 节点挂了,Queue 不可用

镜像队列(Classic Queue Mirroring):
  - Queue 在多个节点有副本
  - 主节点写入,同步到从节点
  - 主节点挂了,从节点自动提升

Quorum Queue(仲裁队列,推荐):
  - RabbitMQ 3.8+ 新模式
  - 基于 Raft 协议实现一致性
  - 替代镜像队列,性能和可靠性更好

7.2 Quorum Queue 配置

1
2
3
4
5
6
// 声明仲裁队列
var args = new Dictionary<string, object>
{
    { "x-queue-type", "quorum" }
};
channel.QueueDeclare("order-queue", durable: true, exclusive: false, autoDelete: false, args);

八、性能调优

8.1 生产端优化

1
2
3
4
批量发送        — 减少 RTT,一次发送多条
Confirm 批量确认 — 多条消息一起确认
异步 Confirm    — 注册回调处理 ACK/NACK
连接池           — 复用 Connection 和 Channel

8.2 消费端优化

1
2
3
4
5
6
7
8
Prefetch Count  — 控制一次性推送的消息数
                  太小 → 吞吐量低
                  太大 → 消费者积压
                  建议:10-100,根据处理速度调整

多消费者        — 一个 Queue 多个 Consumer 并行消费
多 Channel      — 一个 Connection 多个 Channel
手动 ACK        — 避免处理一半自动确认

8.3 Broker 端优化

1
2
3
4
消息持久化权衡  — 持久化慢但安全,非持久化快但可能丢
惰性队列        — 消息直接写磁盘,内存占用低,适合百万级堆积
流控            — 内存/磁盘告警时限制生产者发送速度
节点数量        — 3-5 个节点,兼顾可用性和性能

九、运维常用操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 用户管理
rabbitmqctl add_user admin password
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

# 队列管理
rabbitmqctl list_queues name messages consumers
rabbitmqctl list_queues name durable auto_delete

# 交换机
rabbitmqctl list_exchanges name type

# 连接和通道
rabbitmqctl list_connections
rabbitmqctl list_channels

# 集群状态
rabbitmqctl cluster_status

# 插件
rabbitmq-plugins enable rabbitmq_management    # Web 管理界面
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# Web 管理界面
# http://localhost:15672
# 默认账号:guest / guest(只能本地访问)

十、小结

本文深入学习了 RabbitMQ:

  • 架构(Connection、Channel、VHost、Exchange、Queue、Binding)
  • 五种 Exchange 类型和路由规则
  • 消息可靠性保证(Confirm、持久化、手动 ACK)
  • 死信队列和延迟消息
  • .NET 实战(RabbitMQ.Client、MassTransit)
  • 集群高可用(镜像队列、Quorum Queue)
  • 性能调优

下一篇将深入 Kafka:架构、存储、生产者/消费者和 .NET 实战。