消息队列(三):Kafka 深入

写在前面

本文是消息队列系列的第三篇,深入 Kafka 的架构设计、存储机制、生产者/消费者模型和 .NET 实战。前置知识:消息队列核心概念(第一篇)。


一、Kafka 架构

1.1 整体架构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Producer → Broker Cluster → Consumer Group
         ZooKeeper / KRaft(元数据管理)

核心组件:
- Broker       — Kafka 服务器节点
- Topic        — 消息分类(逻辑概念)
- Partition    — Topic 的物理分片
- Replica      — Partition 的副本
- Producer     — 消息生产者
- Consumer     — 消息消费者
- Consumer Group — 消费者组
- ZooKeeper/KRaft — 集群元数据和协调

1.2 和 RabbitMQ 的本质区别

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
RabbitMQ:
  - 以 Queue 为中心
  - 消息消费后删除
  - 路由逻辑在 Exchange
  - Push 模式

Kafka:
  - 以 Topic/Partition 为中心
  - 消息持久化,保留一段时间
  - 路由逻辑在 Producer(决定发到哪个 Partition)
  - Pull 模式
  - 设计目标是高吞吐的分布式日志系统

1.3 消息流转过程

1
2
3
4
5
1. Producer 决定消息发到哪个 Topic 的哪个 Partition
2. Partition Leader 接收并写入本地日志
3. Follower 从 Leader 拉取并复制
4. Consumer Group 中的 Consumer 各自消费分配到的 Partition
5. Consumer 定期提交 Offset 记录消费进度

二、Topic 和 Partition

2.1 Partition 的作用

1
2
3
4
1. 并行处理    — 不同 Partition 可以被不同 Consumer 并行消费
2. 水平扩展    — 增加 Partition 提高吞吐量
3. 顺序保证    — 同一 Partition 内消息有序
4. 容错        — 每个 Partition 有多个副本

2.2 Partition 数量选择

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
考虑因素:
- 吞吐量需求    — Partition 越多,并行度越高
- Consumer 数量 — 一个 Partition 只能被 Consumer Group 中一个 Consumer 消费
- Broker 数量   — Partition 尽量均匀分布在各 Broker

经验值:
- 小规模:每个 Topic 3-6 个 Partition
- 中规模:每个 Topic 12-24 个 Partition
- 大规模:可以到几百个

注意:Partition 数量只能增不能减

2.3 Key 和 Partition 路由

1
2
3
4
5
Producer 发送消息时可以指定 Key:
- Key 为 null   — 轮询分配到不同 Partition(Round Robin)
- Key 不为 null  — 对 Key 做 Hash,相同 Key 到同一个 Partition

这保证了相同业务 ID 的消息(如同一订单)都在同一个 Partition,保持顺序
1
2
3
4
5
6
7
8
// 指定 Key 发送
var message = new Message<string, string>
{
    Key = $"order-{orderId}",    // 相同订单 ID → 同一 Partition
    Value = JsonSerializer.Serialize(orderEvent)
};

await producer.ProduceAsync("orders", message);

三、存储机制

3.1 日志追加(Append-Only Log)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
Kafka 的核心存储设计:

每个 Partition 是一个追加写入的日志文件:
Offset  Message
0       {"orderId":1,"action":"created"}
1       {"orderId":1,"action":"paid"}
2       {"orderId":2,"action":"created"}
3       {"orderId":1,"action":"shipped"}
...

特点:
- 顺序写磁盘 → 极快(600MB/s 顺序写 vs 100KB/s 随机写)
- 不可修改 → 只能追加
- 通过 Offset 随机读取

3.2 Segment 分段

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
一个 Partition 由多个 Segment 组成:

Partition (topic-order-0/)
├── 00000000000000000000.log     ← Segment 1(Offset 0 开始)
├── 00000000000000000000.index   ← 偏移量索引
├── 00000000000000000000.timeindex ← 时间索引
├── 00000000000000000036.log     ← Segment 2(Offset 36 开始)
├── 00000000000000000036.index
└── 00000000000000000036.timeindex

Segment 滚动条件:
- 达到 1GB(log.segment.bytes)
- 达到 7 天(log.segment.ms)

好处:
- 快速定位消息(二分查找 Segment + 索引)
- 方便清理过期数据(删除整个 Segment)

3.3 零拷贝(Zero-Copy)

1
2
3
4
5
6
7
8
传统数据传输(4 次拷贝):
  磁盘 → 内核缓冲区 → 用户空间 → Socket 缓冲区 → 网卡

Kafka 零拷贝(2 次拷贝):
  磁盘 → 内核缓冲区 → 网卡

通过 Java 的 FileChannel.transferTo()(Linux 的 sendfile 系统调用)
配合页缓存(Page Cache),Kafka 极少做用户空间的数据拷贝

3.4 页缓存(Page Cache)

1
2
3
4
5
6
Kafka 不自己管理缓存,而是利用操作系统的页缓存:
- 写入时先写页缓存,由 OS 异步刷盘
- 读取时先从页缓存读,命中就不用访问磁盘
- 生产者和消费者访问的是同一块页缓存(热数据在内存中)

这就是 Kafka 高吞吐的关键:顺序写 + 页缓存 + 零拷贝

3.5 数据保留和清理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
保留策略(二选一):
- 按时间:log.retention.hours=168(默认 7 天)
- 按大小:log.retention.bytes=1073741824(1GB)

清理策略:
- delete    — 删除整个 Segment(默认)
- compact   — 保留每个 Key 的最新值(适合 changelog)

Compact 示例:
  写入:key=user1, value=张三
  写入:key=user2, value=李四
  写入:key=user1, value=王五(更新)
  Compact 后只保留:
    key=user1, value=王五
    key=user2, value=李四

四、生产者

4.1 发送模式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 1. 发后即忘(Fire and Forget)— 最快,可能丢
producer.Produce("orders", message);

// 2. 同步发送 — 等待 ACK,最慢但最安全
var result = await producer.ProduceAsync("orders", message);
Console.WriteLine($"Offset: {result.Offset}");

// 3. 异步发送 + 回调 — 推荐,兼顾性能和可靠性
producer.Produce("orders", message, report =>
{
    if (report.Error.Code != ErrorCode.NoError)
        logger.LogError("发送失败:{Error}", report.Error.Reason);
    else
        logger.LogInformation("发送成功:Partition={Partition}, Offset={Offset}",
            report.Partition, report.Offset);
});

4.2 ACK 配置

1
2
3
4
5
6
7
8
var config = new ProducerConfig
{
    BootstrapServers = "localhost:9092",
    Acks = Acks.All,  // 等同于 acks=all
    // Acks.None     — 不等 ACK(最快,可能丢)
    // Acks.Leader   — Leader 写入就返回(默认)
    // Acks.All      — 所有 ISR 副本都写入(最安全)
};

4.3 批量发送和缓冲

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
var config = new ProducerConfig
{
    BootstrapServers = "localhost:9092",

    // 批量大小(字节)
    BatchSize = 16384,          // 16KB

    // 等待时间(毫秒)— 攒够一批或超时就发
    LingerMs = 5,               // 5ms

    // 缓冲区大小
    BufferMemory = 33554432,    // 32MB

    // 压缩
    CompressionType = CompressionType.Lz4,

    // 重试
    MessageSendMaxRetries = 3,
    RetryBackoffMs = 100,
};
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
批量发送原理:
  Producer 内部有一个缓冲区:
  - 消息先写入缓冲区
  - 攒够一个 Batch(BatchSize)或等待超时(LingerMs)就发送
  - 大幅减少网络请求次数

LingerMs 和 BatchSize 的权衡:
  LingerMs 大 → 批更大但延迟高
  LingerMs 小 → 批更小但延迟低
  建议:LingerMs=5-20ms,BatchSize=16-64KB

4.4 幂等生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
var config = new ProducerConfig
{
    BootstrapServers = "localhost:9092",
    Acks = Acks.All,
    EnableIdempotence = true,   // 开启幂等(自动设置 acks=all)

    // 幂等生产者保证:
    // 即使网络重试,同一条消息也只写入一次
    // 通过 Producer ID + Sequence Number 去重
    // 注意:只保证单分区的单会话幂等
};

五、消费者

5.1 Consumer Group

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
核心概念:
- 一个 Consumer Group 中的 Consumer 共同消费一个 Topic
- 每个 Partition 只被 Group 中的一个 Consumer 消费
- 不同 Group 各自独立消费(发布订阅)

示例:
  Topic "orders" 有 3 个 Partition

  Group "sms-group":
    Consumer-1 → Partition-0
    Consumer-2 → Partition-1
    Consumer-3 → Partition-2

  Group "points-group":
    Consumer-A → Partition-0, 1, 2(只有一个 Consumer,消费所有)

注意:
- Consumer 数量 > Partition 数量 → 多余的 Consumer 空闲
- Consumer 数量 < Partition 数量 → 有的 Consumer 消费多个 Partition

5.2 Offset 管理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
Offset:消息在 Partition 中的位置(从 0 递增)

消费进度管理:
- Consumer 定期提交已消费的 Offset
- 重启后从上次提交的 Offset 继续消费

提交方式:
1. 自动提交(enable.auto.commit=true)
   - 定期自动提交(默认 5 秒)
   - 可能重复消费(处理完但没提交就挂了)
   - 简单但不可靠

2. 手动提交(enable.auto.commit=false)
   - 处理完业务后手动调用 Commit
   - 更可靠但代码更复杂
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 手动提交
var config = new ConsumerConfig
{
    BootstrapServers = "localhost:9092",
    GroupId = "order-sms-group",
    EnableAutoCommit = false,           // 关闭自动提交
    AutoOffsetReset = AutoOffsetReset.Earliest  // 无 Offset 时从头开始
};

using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe("orders");

while (!cancellationToken.IsCancellationRequested)
{
    var result = consumer.Consume(CancellationToken.None);

    try
    {
        // 处理业务
        await ProcessOrder(result.Message.Value);

        // 处理成功,提交 Offset
        consumer.Commit(result);
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "处理失败,不提交 Offset");
        // 不提交,下次重启会重新消费
    }
}

5.3 Rebalance

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
Consumer Group 中 Consumer 变化时触发 Rebalance:
- 新 Consumer 加入
- Consumer 离开(崩溃、关闭)
- 订阅的 Topic Partition 数量变化

Rebalance 过程:
1. 所有 Consumer 撤回分配的 Partition
2. 重新分配 Partition 给各 Consumer
3. 各 Consumer 从新分配的 Partition 继续

Rebalance 的问题:
- 短暂的消费暂停(Stop The World)
- 可能重复消费(Rebalance 前 Commit 了吗?)
- 频繁 Rebalance 影响性能

减少 Rebalance:
- 合理设置 session.timeout.ms(默认 45s)
- 合理设置 heartbeat.interval.ms(默认 3s)
- 避免消费者处理太慢(超过 session.timeout 会被踢出)
- 使用 CooperativeStickyAssignor(增量 Rebalance)

5.4 消费者配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var config = new ConsumerConfig
{
    BootstrapServers = "localhost:9092",
    GroupId = "order-sms-group",

    // Offset 提交
    EnableAutoCommit = false,

    // 无 Offset 时从哪里开始
    AutoOffsetReset = AutoOffsetReset.Earliest,  // 最早
    // AutoOffsetReset.Latest   — 最新的
    // AutoOffsetReset.Error    — 报错

    // 心跳和超时
    SessionTimeoutMs = 30000,       // 30 秒无心跳认为挂了
    HeartbeatIntervalMs = 3000,     // 3 秒发一次心跳
    MaxPollIntervalMs = 300000,     // 两次 poll 最大间隔(5 分钟)

    // 拉取配置
    FetchMinBytes = 1,              // 最少拉取 1 字节
    FetchMaxWaitMs = 500,           // 最多等 500ms
    MaxPartitionFetchBytes = 1048576, // 每个 Partition 最多拉 1MB
};

六、副本和高可用

6.1 副本机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
每个 Partition 有多个 Replica:
- Leader Replica    — 处理读写请求
- Follower Replica  — 从 Leader 复制数据,不处理客户端请求

副本因子(replication.factor):
- 通常设为 3(1 Leader + 2 Follower)
- 不同 Broker 上各放一个副本

ISR(In-Sync Replicas):
- 和 Leader 保持同步的副本集合
- Follower 落后太多会被移出 ISR
- Leader 挂了,从 ISR 中选新 Leader

6.2 Leader 选举

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Leader 挂了时:
1. Controller(集群中的一个 Broker)检测到
2. 从 ISR 中选一个 Follower 作为新 Leader
3. 通知所有 Broker 和 Consumer

配置:
  unclean.leader.election.enable=false(默认)
  → 只允许 ISR 中的副本成为 Leader(安全)
  → 如果 ISR 为空,Partition 不可用

  unclean.leader.election.enable=true(不推荐)
  → 允许非 ISR 副本成为 Leader
  → 可用性更高但可能丢数据

6.3 副本配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Topic 级别配置
kafka-topics.sh --create \
  --topic orders \
  --partitions 6 \
  --replication-factor 3 \
  --config min.insync.replicas=2

# min.insync.replicas=2 配合 acks=all
# → 至少 2 个副本(含 Leader)写入成功才返回
# → 3 个副本容忍 1 个宕机

七、Kafka 事务

7.1 Exactly-Once 语义

1
2
3
4
Kafka 事务保证:
- 原子性:一批消息要么全部成功要么全部失败
- 跨 Partition:可以同时写入多个 Partition
- 消费-处理-生产:从 Topic A 消费,处理完写入 Topic B,整个流程 Exactly-Once

7.2 事务生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
var config = new ProducerConfig
{
    BootstrapServers = "localhost:9092",
    TransactionalId = "order-tx-producer-1",  // 事务 ID(必须唯一)
    EnableIdempotence = true,
    Acks = Acks.All
};

using var producer = new ProducerBuilder<string, string>(config).Build();
producer.InitTransactions(TimeSpan.FromSeconds(10));

try
{
    producer.BeginTransaction();

    // 发送多条消息到不同 Partition/Topic
    await producer.ProduceAsync("orders", new Message<string, string> { Key = "order-1", Value = "..." });
    await producer.ProduceAsync("payments", new Message<string, string> { Key = "pay-1", Value = "..." });

    // 提交事务
    producer.CommitTransaction(TimeSpan.FromSeconds(10));
}
catch
{
    // 回滚事务
    producer.AbortTransaction(TimeSpan.FromSeconds(10));
}

7.3 事务消费者

1
2
3
4
5
6
7
8
var config = new ConsumerConfig
{
    BootstrapServers = "localhost:9092",
    GroupId = "order-processor",
    IsolationLevel = IsolationLevel.ReadCommitted,
    // ReadCommitted  — 只读已提交的事务消息(推荐)
    // ReadUncommitted — 读所有消息包括未提交的
};

八、.NET 实战

8.1 使用 Confluent.Kafka

1
dotnet add package Confluent.Kafka

8.2 完整 Producer 封装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class KafkaProducerService : IDisposable
{
    private readonly IProducer<string, string> _producer;
    private readonly ILogger<KafkaProducerService> _logger;

    public KafkaProducerService(IConfiguration config, ILogger<KafkaProducerService> logger)
    {
        _logger = logger;
        var producerConfig = new ProducerConfig
        {
            BootstrapServers = config["Kafka:BootstrapServers"],
            Acks = Acks.All,
            EnableIdempotence = true,
            LingerMs = 10,
            BatchSize = 32768,
            CompressionType = CompressionType.Lz4,
            MessageSendMaxRetries = 3,
            RetryBackoffMs = 100
        };

        _producer = new ProducerBuilder<string, string>(producerConfig).Build();
    }

    public async Task ProduceAsync<T>(string topic, string key, T value)
    {
        var message = new Message<string, string>
        {
            Key = key,
            Value = JsonSerializer.Serialize(value)
        };

        var result = await _producer.ProduceAsync(topic, message);

        _logger.LogDebug("发送到 {Topic} Partition:{Partition} Offset:{Offset}",
            topic, result.Partition, result.Offset);
    }

    public void Dispose() => _producer?.Dispose();
}

8.3 完整 Consumer 封装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class KafkaConsumerService<T> : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly ILogger<KafkaConsumerService<T>> _logger;
    private readonly Func<string, Task> _handler;
    private readonly string _topic;

    public KafkaConsumerService(
        IConfiguration config,
        ILogger<KafkaConsumerService<T>> logger,
        string topic,
        string groupId,
        Func<string, Task> handler)
    {
        _logger = logger;
        _handler = handler;
        _topic = topic;

        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = config["Kafka:BootstrapServers"],
            GroupId = groupId,
            EnableAutoCommit = false,
            AutoOffsetReset = AutoOffsetReset.Earliest,
            SessionTimeoutMs = 30000,
            MaxPollIntervalMs = 300000
        };

        _consumer = new ConsumerBuilder<string, string>(consumerConfig)
            .SetErrorHandler((_, e) => _logger.LogError("Kafka 错误:{Error}", e.Reason))
            .Build();
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _consumer.Subscribe(_topic);

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var result = _consumer.Consume(stoppingToken);

                await _handler(result.Message.Value);

                _consumer.Commit(result);
            }
            catch (ConsumeException ex)
            {
                _logger.LogError(ex, "消费异常");
            }
            catch (OperationCanceledException)
            {
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "处理消息异常");
                // 不提交,下次重新消费
            }
        }

        _consumer.Close();
    }

    public override void Dispose()
    {
        _consumer?.Dispose();
        base.Dispose();
    }
}

8.4 注册和使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 注册 Producer
builder.Services.AddSingleton<KafkaProducerService>();

// 注册 Consumer(BackgroundService)
builder.Services.AddHostedService(sp =>
    new KafkaConsumerService<OrderEvent>(
        sp.GetRequiredService<IConfiguration>(),
        sp.GetRequiredService<ILogger<KafkaConsumerService<OrderEvent>>>(),
        topic: "orders",
        groupId: "order-sms-group",
        handler: async (json) =>
        {
            var order = JsonSerializer.Deserialize<OrderEvent>(json);
            await SendSmsAsync(order!.CustomerEmail, "您的订单已创建");
        }));

九、常用运维命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Topic 管理
kafka-topics.sh --create --topic orders --partitions 6 --replication-factor 3 --bootstrap-server localhost:9092
kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092
kafka-topics.sh --alter --topic orders --partitions 12 --bootstrap-server localhost:9092  # 只能增

# 消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
kafka-consumer-groups.sh --describe --group order-sms-group --bootstrap-server localhost:9092

# 查看消费进度(LAG)
kafka-consumer-groups.sh --describe --group order-sms-group --bootstrap-server localhost:9092
# 输出:CURRENT-OFFSET  LOG-END-OFFSET  LAG
# LAG > 0 说明消费有积压

# 重置 Offset
kafka-consumer-groups.sh --reset-offsets --group order-sms-group --topic orders --to-earliest --execute --bootstrap-server localhost:9092

# 配置修改
kafka-configs.sh --alter --entity-type topics --entity-name orders --add-config retention.ms=604800000 --bootstrap-server localhost:9092

# 查看配置
kafka-configs.sh --describe --entity-type topics --entity-name orders --bootstrap-server localhost:9092

十、小结

本文深入学习了 Kafka:

  • 架构(Broker、Topic、Partition、Consumer Group)
  • 和 RabbitMQ 的本质区别
  • 存储机制(追加日志、Segment、零拷贝、页缓存)
  • 生产者(发送模式、ACK、批量、幂等)
  • 消费者(Consumer Group、Offset、Rebalance)
  • 副本和高可用
  • Kafka 事务和 Exactly-Once
  • .NET 实战(Confluent.Kafka)
  • 运维常用命令

下一篇将对比 RabbitMQ 和 Kafka,帮助你在实际项目中做出选型。