1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
public class KafkaProducerService : IDisposable
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<KafkaProducerService> _logger;
public KafkaProducerService(IConfiguration config, ILogger<KafkaProducerService> logger)
{
_logger = logger;
var producerConfig = new ProducerConfig
{
BootstrapServers = config["Kafka:BootstrapServers"],
Acks = Acks.All,
EnableIdempotence = true,
LingerMs = 10,
BatchSize = 32768,
CompressionType = CompressionType.Lz4,
MessageSendMaxRetries = 3,
RetryBackoffMs = 100
};
_producer = new ProducerBuilder<string, string>(producerConfig).Build();
}
public async Task ProduceAsync<T>(string topic, string key, T value)
{
var message = new Message<string, string>
{
Key = key,
Value = JsonSerializer.Serialize(value)
};
var result = await _producer.ProduceAsync(topic, message);
_logger.LogDebug("发送到 {Topic} Partition:{Partition} Offset:{Offset}",
topic, result.Partition, result.Offset);
}
public void Dispose() => _producer?.Dispose();
}
|