< Summary

Class:SeungYongShim.Kafka.KafkaConsumer
Assembly:SeungYongShim.Kafka
File(s):/home/runner/work/SeungYongShim.Kafka/SeungYongShim.Kafka/src/SeungYongShim.Kafka/KafkaConsumer.cs
Covered lines:105
Uncovered lines:0
Coverable lines:105
Total lines:150
Line coverage:100% (105 of 105)
Covered branches:6
Total branches:10
Branch coverage:60% (6 of 10)
Tag:50_866899697

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity NPath complexity Sequence coverage
.ctor(...)0%110100%
ConsumeAsync()0%10100100%
Start(...)66.67%2.012288.89%
>c__DisplayClass22_0/<<Start()71.43%11.4211884.85%
Stop()0%110100%
Dispose(...)66.67%332100%
Dispose()0%110100%

File(s)

/home/runner/work/SeungYongShim.Kafka/SeungYongShim.Kafka/src/SeungYongShim.Kafka/KafkaConsumer.cs

#LineLine coverage
 1using System;
 2using System.Collections.Generic;
 3using System.Diagnostics;
 4using System.Linq;
 5using System.Text;
 6using System.Text.Json;
 7using System.Threading;
 8using System.Threading.Channels;
 9using System.Threading.Tasks;
 10using Confluent.Kafka;
 11using Microsoft.Extensions.Logging;
 12using SeungYongShim.ProtobufHelper;
 13
 14namespace SeungYongShim.Kafka
 15{
 16    public class KafkaConsumer : IDisposable
 17    {
 18        private bool disposedValue;
 19
 220        public KafkaConsumer(KafkaConfig kafkaConfig,
 221                             ProtoKnownTypes knownTypes,
 222                             ILogger<KafkaConsumer> logger)
 223        {
 224            Logger = logger;
 225            KafkaConfig = kafkaConfig;
 226            ProtoKnownTypes = knownTypes;
 227            ConsumeChannel = Channel.CreateBounded<(Headers, string, Action)>(10);
 228        }
 29
 30        public ILogger<KafkaConsumer> Logger { get; }
 31        public KafkaConfig KafkaConfig { get; }
 32        public ProtoKnownTypes ProtoKnownTypes { get; }
 33        public Channel<(Headers, string, Action)> ConsumeChannel { get; }
 234        public CancellationTokenSource CancellationTokenSource { get; } = new CancellationTokenSource();
 35        public Thread KafkaConsumerThread { get; private set; }
 36
 37        public async Task<Commitable> ConsumeAsync(TimeSpan timeOut)
 238        {
 239            var cts = new CancellationTokenSource(timeOut);
 240            var (headers, message, action) = await ConsumeChannel.Reader.ReadAsync(cts.Token);
 41
 442            var activityId = headers.First(x => x.Key is "traceparent")?.GetValueBytes();
 243            var anyJson = JsonSerializer.Deserialize<AnyJson>(message);
 244            var o = ProtoKnownTypes.Unpack(anyJson.ToAny());
 45
 246            using var activity = ActivitySourceStatic.Instance.StartActivity("kafka-consume", ActivityKind.Consumer, Enc
 47
 248            return new Commitable(o, action, activity?.Id);
 249        }
 50
 51        public void Start(string groupId,
 52                          IEnumerable<string> topics)
 253        {
 254            if (KafkaConsumerThread is not null) return;
 55
 256            var cancellationToken = CancellationTokenSource.Token;
 257            var timeout = TimeSpan.FromMinutes(10);
 58
 259            var config = new ConsumerConfig
 260            {
 261                BootstrapServers = KafkaConfig.Brokers,
 262                GroupId = groupId,
 263                EnableAutoCommit = false,
 264                StatisticsIntervalMs = 5000,
 265                SessionTimeoutMs = 6000,
 266                AutoOffsetReset = AutoOffsetReset.Earliest,
 267                EnablePartitionEof = true,
 268                PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky
 269            };
 70
 271            KafkaConsumerThread = new Thread(async () =>
 472            {
 473                var token = CancellationTokenSource.Token;
 474                using (var consumer = new ConsumerBuilder<string, string>(config).Build())
 475                {
 476                    consumer.Subscribe(topics);
 277
 278                    try
 479                    {
 880                        while (true)
 881                        {
 882                            token.ThrowIfCancellationRequested();
 283                            try
 684                            {
 685                                var cr = consumer.Consume(KafkaConfig.TimeOut);
 286
 687                                if (cr is null) continue; // 이유를 현재 모르겠음
 888                                if (cr.IsPartitionEOF) continue;
 289
 490                                await ConsumeChannel.Writer.WriteAsync((cr.Message.Headers,
 491                                                                        cr.Message.Value,
 492                                                                        () =>
 693                                                                        {
 494                                                                            try
 695                                                                            {
 696                                                                                consumer.Commit(cr);
 697                                                                            }
 498                                                                            catch (KafkaException e)
 499                                                                            {
 4100                                                                                Logger.LogError($"Commit error: {e.Error
 4101                                                                            }
 6102                                                                        }
 4103                                ));
 4104                            }
 2105                            catch (ConsumeException ex)
 2106                            {
 2107                                Logger.LogWarning(ex, "");
 2108                            }
 4109                        }
 2110                    }
 4111                    catch (Exception ex)
 4112                    {
 4113                        ConsumeChannel.Writer.TryComplete(ex);
 4114                        Logger.LogError(ex, "");
 4115                    }
 2116                    finally
 4117                    {
 4118                        consumer.Close();
 4119                        ConsumeChannel.Writer.TryComplete();
 4120                    }
 4121                }
 4122            });
 123
 2124            KafkaConsumerThread.Start();
 2125        }
 126
 2127        public void Stop() => CancellationTokenSource.Cancel();
 128
 129        protected virtual void Dispose(bool disposing)
 2130        {
 2131            if (!disposedValue)
 2132            {
 2133                if (disposing)
 2134                {
 2135                    Stop();
 2136                    KafkaConsumerThread.Join(TimeSpan.FromSeconds(5));
 2137                }
 138
 2139                KafkaConsumerThread = null;
 2140                disposedValue = true;
 2141            }
 2142        }
 143
 144        public void Dispose()
 2145        {
 2146            Dispose(disposing: true);
 2147            GC.SuppressFinalize(this);
 2148        }
 149    }
 150}