| | 1 | | using System; |
| | 2 | | using System.Collections.Generic; |
| | 3 | | using System.Diagnostics; |
| | 4 | | using System.Linq; |
| | 5 | | using System.Text; |
| | 6 | | using System.Text.Json; |
| | 7 | | using System.Threading; |
| | 8 | | using System.Threading.Channels; |
| | 9 | | using System.Threading.Tasks; |
| | 10 | | using Confluent.Kafka; |
| | 11 | | using Microsoft.Extensions.Logging; |
| | 12 | | using SeungYongShim.ProtobufHelper; |
| | 13 | |
|
| | 14 | | namespace SeungYongShim.Kafka |
| | 15 | | { |
| | 16 | | public class KafkaConsumer : IDisposable |
| | 17 | | { |
| | 18 | | private bool disposedValue; |
| | 19 | |
|
| 2 | 20 | | public KafkaConsumer(KafkaConfig kafkaConfig, |
| 2 | 21 | | ProtoKnownTypes knownTypes, |
| 2 | 22 | | ILogger<KafkaConsumer> logger) |
| 2 | 23 | | { |
| 2 | 24 | | Logger = logger; |
| 2 | 25 | | KafkaConfig = kafkaConfig; |
| 2 | 26 | | ProtoKnownTypes = knownTypes; |
| 2 | 27 | | ConsumeChannel = Channel.CreateBounded<(Headers, string, Action)>(10); |
| 2 | 28 | | } |
| | 29 | |
|
| | 30 | | public ILogger<KafkaConsumer> Logger { get; } |
| | 31 | | public KafkaConfig KafkaConfig { get; } |
| | 32 | | public ProtoKnownTypes ProtoKnownTypes { get; } |
| | 33 | | public Channel<(Headers, string, Action)> ConsumeChannel { get; } |
| 2 | 34 | | public CancellationTokenSource CancellationTokenSource { get; } = new CancellationTokenSource(); |
| | 35 | | public Thread KafkaConsumerThread { get; private set; } |
| | 36 | |
|
| | 37 | | public async Task<Commitable> ConsumeAsync(TimeSpan timeOut) |
| 2 | 38 | | { |
| 2 | 39 | | var cts = new CancellationTokenSource(timeOut); |
| 2 | 40 | | var (headers, message, action) = await ConsumeChannel.Reader.ReadAsync(cts.Token); |
| | 41 | |
|
| 4 | 42 | | var activityId = headers.First(x => x.Key is "traceparent")?.GetValueBytes(); |
| 2 | 43 | | var anyJson = JsonSerializer.Deserialize<AnyJson>(message); |
| 2 | 44 | | var o = ProtoKnownTypes.Unpack(anyJson.ToAny()); |
| | 45 | |
|
| 2 | 46 | | using var activity = ActivitySourceStatic.Instance.StartActivity("kafka-consume", ActivityKind.Consumer, Enc |
| | 47 | |
|
| 2 | 48 | | return new Commitable(o, action, activity?.Id); |
| 2 | 49 | | } |
| | 50 | |
|
| | 51 | | public void Start(string groupId, |
| | 52 | | IEnumerable<string> topics) |
| 2 | 53 | | { |
| 2 | 54 | | if (KafkaConsumerThread is not null) return; |
| | 55 | |
|
| 2 | 56 | | var cancellationToken = CancellationTokenSource.Token; |
| 2 | 57 | | var timeout = TimeSpan.FromMinutes(10); |
| | 58 | |
|
| 2 | 59 | | var config = new ConsumerConfig |
| 2 | 60 | | { |
| 2 | 61 | | BootstrapServers = KafkaConfig.Brokers, |
| 2 | 62 | | GroupId = groupId, |
| 2 | 63 | | EnableAutoCommit = false, |
| 2 | 64 | | StatisticsIntervalMs = 5000, |
| 2 | 65 | | SessionTimeoutMs = 6000, |
| 2 | 66 | | AutoOffsetReset = AutoOffsetReset.Earliest, |
| 2 | 67 | | EnablePartitionEof = true, |
| 2 | 68 | | PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky |
| 2 | 69 | | }; |
| | 70 | |
|
| 2 | 71 | | KafkaConsumerThread = new Thread(async () => |
| 4 | 72 | | { |
| 4 | 73 | | var token = CancellationTokenSource.Token; |
| 4 | 74 | | using (var consumer = new ConsumerBuilder<string, string>(config).Build()) |
| 4 | 75 | | { |
| 4 | 76 | | consumer.Subscribe(topics); |
| 2 | 77 | |
|
| 2 | 78 | | try |
| 4 | 79 | | { |
| 8 | 80 | | while (true) |
| 8 | 81 | | { |
| 8 | 82 | | token.ThrowIfCancellationRequested(); |
| 2 | 83 | | try |
| 6 | 84 | | { |
| 6 | 85 | | var cr = consumer.Consume(KafkaConfig.TimeOut); |
| 2 | 86 | |
|
| 6 | 87 | | if (cr is null) continue; // 이유를 현재 모르겠음 |
| 8 | 88 | | if (cr.IsPartitionEOF) continue; |
| 2 | 89 | |
|
| 4 | 90 | | await ConsumeChannel.Writer.WriteAsync((cr.Message.Headers, |
| 4 | 91 | | cr.Message.Value, |
| 4 | 92 | | () => |
| 6 | 93 | | { |
| 4 | 94 | | try |
| 6 | 95 | | { |
| 6 | 96 | | consumer.Commit(cr); |
| 6 | 97 | | } |
| 4 | 98 | | catch (KafkaException e) |
| 4 | 99 | | { |
| 4 | 100 | | Logger.LogError($"Commit error: {e.Error |
| 4 | 101 | | } |
| 6 | 102 | | } |
| 4 | 103 | | )); |
| 4 | 104 | | } |
| 2 | 105 | | catch (ConsumeException ex) |
| 2 | 106 | | { |
| 2 | 107 | | Logger.LogWarning(ex, ""); |
| 2 | 108 | | } |
| 4 | 109 | | } |
| 2 | 110 | | } |
| 4 | 111 | | catch (Exception ex) |
| 4 | 112 | | { |
| 4 | 113 | | ConsumeChannel.Writer.TryComplete(ex); |
| 4 | 114 | | Logger.LogError(ex, ""); |
| 4 | 115 | | } |
| 2 | 116 | | finally |
| 4 | 117 | | { |
| 4 | 118 | | consumer.Close(); |
| 4 | 119 | | ConsumeChannel.Writer.TryComplete(); |
| 4 | 120 | | } |
| 4 | 121 | | } |
| 4 | 122 | | }); |
| | 123 | |
|
| 2 | 124 | | KafkaConsumerThread.Start(); |
| 2 | 125 | | } |
| | 126 | |
|
| 2 | 127 | | public void Stop() => CancellationTokenSource.Cancel(); |
| | 128 | |
|
| | 129 | | protected virtual void Dispose(bool disposing) |
| 2 | 130 | | { |
| 2 | 131 | | if (!disposedValue) |
| 2 | 132 | | { |
| 2 | 133 | | if (disposing) |
| 2 | 134 | | { |
| 2 | 135 | | Stop(); |
| 2 | 136 | | KafkaConsumerThread.Join(TimeSpan.FromSeconds(5)); |
| 2 | 137 | | } |
| | 138 | |
|
| 2 | 139 | | KafkaConsumerThread = null; |
| 2 | 140 | | disposedValue = true; |
| 2 | 141 | | } |
| 2 | 142 | | } |
| | 143 | |
|
| | 144 | | public void Dispose() |
| 2 | 145 | | { |
| 2 | 146 | | Dispose(disposing: true); |
| 2 | 147 | | GC.SuppressFinalize(this); |
| 2 | 148 | | } |
| | 149 | | } |
| | 150 | | } |