< Summary

Class:SeungYongShim.Kafka.KafkaProducer
Assembly:SeungYongShim.Kafka
File(s):/home/runner/work/SeungYongShim.Kafka/SeungYongShim.Kafka/src/SeungYongShim.Kafka/KafkaProducer.cs
Covered lines:43
Uncovered lines:5
Coverable lines:48
Total lines:84
Line coverage:89.5% (43 of 48)
Covered branches:3
Total branches:6
Branch coverage:50% (3 of 6)
Tag:50_866899697

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity NPath complexity Sequence coverage
.cctor()0%2100%
.ctor(...)0%110100%
SendAsync()66.67%14.7312273.33%
Dispose(...)60%444100%
Dispose()0%110100%

File(s)

/home/runner/work/SeungYongShim.Kafka/SeungYongShim.Kafka/src/SeungYongShim.Kafka/KafkaProducer.cs

#LineLine coverage
 1using System;
 2using System.Diagnostics;
 3using System.Text;
 4using System.Threading.Tasks;
 5using Confluent.Kafka;
 6using Google.Protobuf;
 7using Google.Protobuf.WellKnownTypes;
 8using Microsoft.Extensions.Logging;
 9using OpenTelemetry.Context.Propagation;
 10
 11namespace SeungYongShim.Kafka
 12{
 13    public class KafkaProducer : IDisposable
 14    {
 15        private bool disposedValue;
 016        private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
 17
 218        public KafkaProducer(KafkaConfig kafkaConfig, ILogger<KafkaConsumer> logger)
 219        {
 220            KafkaConfig = kafkaConfig;
 221            _log = logger;
 22
 223            var config = new ProducerConfig
 224            {
 225                BootstrapServers = kafkaConfig.Brokers,
 226                Acks = Acks.All,
 227            };
 28
 229            Producer = new ProducerBuilder<string, string>(config).Build();
 230        }
 31
 32        public async Task SendAsync(IMessage m, string topic, string key = "1")
 233        {
 34            try
 235            {
 236                using var activity = ActivitySourceStatic.Instance.StartActivity("kafka", ActivityKind.Producer);
 37
 238                var message = JsonFormatter.ToDiagnosticString(Any.Pack(m));
 239                var headers = new Headers()
 240                {
 241                    new Header("traceparent", Encoding.UTF8.GetBytes(activity?.Id ?? string.Empty))
 242                };
 43
 244                var ret = await Producer.ProduceAsync(topic, new Message<string, string>
 245                {
 246                    Key = key,
 247                    Headers = headers,
 248                    Value = message
 449                }); ;
 50
 251                activity?.AddTag("topic", topic);
 252                activity?.AddTag("message", message);
 253            }
 054            catch (Exception ex)
 055            {
 056                _log.LogWarning(ex, "");
 057                throw;
 58            }
 259        }
 60
 61        private ILogger<KafkaConsumer> _log { get; }
 62        public KafkaConfig KafkaConfig { get; }
 63        private IProducer<string, string> Producer { get; }
 64
 65        protected virtual void Dispose(bool disposing)
 266        {
 267            if (!disposedValue)
 268            {
 269                if (disposing)
 270                {
 271                    Producer?.Dispose();
 272                }
 73
 274                disposedValue = true;
 275            }
 276        }
 77
 78        public void Dispose()
 279        {
 280            Dispose(disposing: true);
 281            GC.SuppressFinalize(this);
 282        }
 83    }
 84}