Kafka Producer and Consumer Example in .NET 6 with ASP.NET Core

kafka-producer-and-consumer-example-in-net-6-with-asp.net-core

In this blog, we’ll dive into Kafka, a distributed streaming platform, and learn how to create a Producer and Consumer in .NET 6 using ASP.NET Core. We’ll cover Kafka’s core concepts, provide detailed explanations for each code snippet, and build a functional application that sends and receives messages.

What is Kafka?
Kafka is a high-throughput, distributed messaging system designed to handle real-time data streams. It has three key components:

  1. Producer: Sends data (messages) to Kafka topics.
  2. Consumer: Reads data (messages) from Kafka topics.
  3. Broker: A Kafka server that stores and manages incoming messages. Kafka typically runs in a cluster with multiple brokers.

Kafka organizes data into topics, which are like categories for storing messages. Messages within topics are immutable and ordered.

Image description

Core Kafka Concepts

  1. Topic: Logical channel to which messages are sent.
  2. Partition: Each topic is divided into partitions for parallel processing. Partitions ensure scalability.
  3. Offset: Unique identifier for messages within a partition.
  4. Broker: Kafka server managing topics and partitions.
  5. Producer: Sends data to Kafka topics.
  6. Consumer: Reads data from Kafka topics and processes it.
  7. Group: Consumers are organized into groups to share load and ensure each message is processed by one consumer within the group.

Prerequisites

  1. .NET 6 SDK: Download and install from the official .NET website.
  2. Kafka Installed: Follow the Kafka installation guide or use Docker to set up Kafka.

Getting Started with Kafka in .NET 6

Step 1: Set up Kafka

1.Start Zookeeper (Kafka’s dependency):

zookeeper-server-start.bat ....configzookeeper.properties

2.Start Kafka:

kafka-server-start.bat ....configserver.properties

Image description

3.Create a Kafka topic for this example:

kafka-topics.bat --create --topic fruit --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

Image description

Step 2: Create a .NET 6 Project

Run the following command to create an ASP.NET Core Web API project:

dotnet new webapi -n KafkaProducerConsumer
cd KafkaProducerConsumer

Step 3: Install Kafka Library
Kafka communication in .NET is enabled by the Confluent.Kafka library. Install it via NuGet:

dotnet add package Confluent.Kafka
dotnet add package Swashbuckle.AspNetCore

Kafka in .NET 6: Step-by-Step Implementation

We will build two services:

  1. Kafka Producer Service: Sends messages to Kafka topics.
  2. Kafka Consumer Service: Continuously listens to and processes messages from Kafka topics.

Step 4: Configure Kafka Settings

Add Kafka configurations in appsettings.json to simplify access throughout the application:

{
    "Kafka": {
    "BootstrapServers": "localhost:9092"
  },
}

Explanation:

  • BootstrapServers: Address of the Kafka broker.
  • Topic: The topic where messages will be sent or received.
  • GroupId: Identifies the consumer group for message processing.

Step 5: Create the Kafka Producer Service

Create a folder named Services and add KafkaProducerService.cs.

using Confluent.Kafka;

namespace KafkaExample.Services;

public interface IKafkaProducerService
{
    Task SendMessageAsync(string topic, string message);
}

public class KafkaProducerService : IKafkaProducerService
{
    private readonly IProducer _producer;

    // Constructor to initialize Kafka producer with configuration
    public KafkaProducerService()
    {
        var config = new ProducerConfig
        {
            BootstrapServers = "localhost:9092" // Kafka server details (ensure this is correct)
        };
        _producer = new ProducerBuilder(config).Build();
    }

    // Method to send message to Kafka topic
    public async Task SendMessageAsync(string topic, string message)
    {
        try
        {
            // Send message to the specified Kafka topic
            await _producer.ProduceAsync(topic, new Message { Value = message });
            Console.WriteLine($"Message '{message}' sent to topic '{topic}'.");
        }
        catch (Exception ex)
        {
            // Log any errors encountered while sending message
            Console.WriteLine($"Error sending message to Kafka: {ex.Message}");
            throw;
        }
    }
}

Explanation:

  • ProducerConfig: Configures the producer, specifying the Kafka broker.
  • ProduceAsync: Sends a message to Kafka asynchronously.
  • Null: Key is set to null since our example doesn’t use keyed messages.
  • _producer: The Kafka producer instance sends messages to the specified topic.

Step 6: Create the Kafka Consumer Service

Add KafkaConsumerService.cs to the Services folder.

using Confluent.Kafka;
using System;
using System.Threading.Tasks;

namespace KafkaExample.Services
{
    public class KafkaConsumerService
    {
        private readonly IConsumer _consumer;

        public KafkaConsumerService()
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = "localhost:9092",
                GroupId = "my-consumer-group",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            _consumer = new ConsumerBuilder(config).Build();
        }

        public void ConsumeMessages(string topic)
        {
            _consumer.Subscribe(topic);

            try
            {
                while (true)
                {
                    var consumeResult = _consumer.Consume();
                    Console.WriteLine($"Consumed message: {consumeResult.Message.Value}");
                }
            }
            catch (ConsumeException e)
            {
                Console.WriteLine($"Error consuming message: {e.Error.Reason}");
            }
        }
    }
}

Explanation:

  • ConsumerConfig: Configures the consumer to connect to the broker and specify the consumer group.
  • Subscribe: Subscribes the consumer to a topic.
  • Consume: Reads messages from the topic.
  • AutoOffsetReset.Earliest: Ensures the consumer starts reading messages from the beginning of the topic if no offsets exist.

Step 7: Register Services in Program.cs

Add both Kafka services to the application in Program.cs.

using KafkaExample.Services;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

var builder = WebApplication.CreateBuilder(args);

// Register Kafka producer service with Dependency Injection
builder.Services.AddSingleton();

// Add controllers (required for API endpoints)
builder.Services.AddControllers();

// Add Swagger for API documentation
builder.Services.AddEndpointsApiExplorer(); // For Swagger UI
builder.Services.AddSwaggerGen(); // For Swagger UI

var app = builder.Build();

// Configure Swagger in development environment
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

// Use HTTPS Redirection
app.UseHttpsRedirection();

// Configure the HTTP request pipeline to use controllers
app.MapControllers();

app.Run();

Step 8: Create an API Endpoint for Sending Messages.

Add a KafkaController.cs in the Controllers folder to handle message requests.

using KafkaExample.Services;
using Microsoft.AspNetCore.Mvc;

namespace KafkaProducerConsumer.Controllers
{
    [ApiController]
    [Route("api/[controller]")]
    public class KafkaController : ControllerBase
    {
        private readonly IKafkaProducerService _producerService;

        public KafkaController(IKafkaProducerService producerService)
        {
            _producerService = producerService;
        }

        [HttpPost("send")]
        public async Task SendMessage([FromQuery] string topic, [FromQuery] string message)
        {
            if (string.IsNullOrEmpty(topic) || string.IsNullOrEmpty(message))
            {
                return BadRequest("Both 'topic' and 'message' query parameters are required.");
            }

            await _producerService.SendMessageAsync(topic, message);
            return Ok($"Message '{message}' sent successfully to topic '{topic}'.");
        }
    }
}


Explanation:

This API accepts a message as a query parameter and passes it to the Kafka producer.

This will be your solution explorer should look like:

Image description

Step 9: Run and Test

1.Start the Kafka and Zookeeper servers.

2.Run the .NET application:

dotnet run

3.Use a REST client like Postman to send a message

POST
http://localhost:5292/api/kafka/send?topic=fruit&message=apple

Image description

Image description

Image description

Kafka Workflow Recap

  1. The producer sends the message to the fruits topic.
  2. The Kafka broker receives and stores the message.
  3. The consumer reads the message from the topic and processes it.

This demonstrates the basic producer-consumer pattern in Kafka, integrated with a .NET 6 application.

You will get source code from github.

Conclusion

In this blog, we explored Kafka concepts and implemented a real-time producer-consumer application in .NET 6. Kafka is highly scalable, fault-tolerant, and suitable for distributed systems. This example can be extended to include advanced features like message keying, batch processing, error handling, and monitoring.

Happy coding!

Total
0
Shares
Leave a Reply

Your email address will not be published. Required fields are marked *

Previous Post
the-importance-and-role-of-domestic-coal-in-modern-economies

The Importance and Role of Domestic Coal in Modern Economies

Next Post
how-to-create-a-restful-api-with-node.js:-a-step-by-step-guide

How to Create a RESTful API with Node.js: A Step-by-Step Guide

Related Posts