Reading Data from a Kafka Topic in Java Spring Boot Example

In this example, we will be building a sample Kafka Consumer application in Java Spring Boot to read data from a Kafka topic.

An application used to read data from one or more topics in Kafka clusters is referred to as a Consumer application.

Here are the steps, starting from scratch, to create a Kafka consumer in Spring Boot for reading data from a Kafka topic:

Create a Spring Boot Application

  1. Go to the Spring Initializr website at https://start.spring.io.
  2. Create a Spring Boot application with details as follows:
    • Project: Choose the project type (Maven or Gradle).
    • Language: Set the language to Java.
    • Spring Boot: Specify the Spring Boot version. The default selection is the latest stable version of Spring Boot, so you can leave it unchanged.
    • Project Metadata: Enter a Group and Artifact name for your project. The group name is the id of the project. Artifact is the name of your project. Add any necessary project metadata (description, package name, etc.)
    • Choose between packaging as a JAR (Java Archive) or a WAR (Web Application Archive) depends on how you plan to deploy your Spring Boot application. Choose JAR packaging if you want a standalone executable JAR file and WAR packaging if you intend to deploy your application to a Java EE application server or servlet container. When you package your Spring Boot application as a JAR using JAR packaging, it includes an embedded web server, such as Tomcat, by default. This means that you don't need to separately deploy your application to an external Tomcat server. Instead, you can run the JAR file directly, and the embedded Tomcat server will start and serve your application.
    • Select the Java version based on the compatibility requirements of your project. Consider the specific needs of your project, any compatibility requirements, and the Java version supported by your target deployment environment when making these choices.
    • Click on the Add Dependencies button.
    • Choose the following dependencies: Spring Web, Spring for Apache Kafka (Messaging) , Lombok, and Spring Boot DevTools.

    Refer to this image for example:

  3. Generate the project:
    • Click on the "Generate" button.
    • Spring Initializr will generate a zip file containing your Spring Boot project.
  4. Download and extract the generated project:
    • Download the zip file generated by Spring Initializr.
    • Extract the contents of the zip file to a directory on your local machine.
  5. Import the project into your IDE:
    • Open your preferred IDE (IntelliJ IDEA, Eclipse, or Spring Tool Suite).
    • Import the extracted project as a Maven or Gradle project, depending on the build system you chose in Spring Initializr.

Add Configurations

Open the src/main/resources/application.properties file in your code editor and add the following configuration lines to the file:

server.port = 8080

kafka.server_endpoint = 127.0.0.1:9092
kafka.topic_name = my-topic-name
kafka.group_id = consumer-1

Here, server.port=8080 configuration line is used to specify the port number on which the server will listen for incoming requests. In this case, it sets the server port to 8080.

kafka.server_endpoint = 127.0.0.1:9092: This specifies the endpoint or address of the Apache Kafka broker that the Spring Boot application will communicate with. Apache Kafka is designed as a distributed system, and the broker is a central component that manages the messaging between producers and consumers. In this case, the Kafka broker is located at the IP address 127.0.0.1 (which is the loopback address, referring to the local machine) and is listening on port 9092.

kafka.topic_name = my-topic-name: This sets the name of the Kafka topic to which the Spring Boot application will produce messages. In Kafka, a topic is a logical channel or category to which messages are published. In this case, the application is set to produce messages to a topic named my-topic-name.

kafka.group_id = consumer-1: This specifies the consumer group identifier for your Kafka consumer. In Kafka, consumers that work together to process messages are organized into consumer groups. The group.id is a unique identifier that distinguishes one consumer group from another. In this case, your Kafka consumer is assigned to a consumer group with the ID consumer-1. This is important for managing the distribution of messages among consumers within the group and ensuring that each message is processed only once.

Configure Consumer

Create a Java class named KafkaConsumerConfig and annotate it with the @EnableKafka and @Configuration annotations. This configuration class should include methods for establishing a connection between the Kafka Consumer application and the Kafka Server:

package com.example.kafka.consumer.config;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
  @Value(value = "${kafka.server_endpoint}")
  private String kafkaServerEndpoint;

  @Value(value = "${kafka.group_id}")
  private String kafkaGroupId;


  @Bean
  public ConsumerFactory<String, String> consumerFactory() {

    Map<String, Object> map = new HashMap<>();

    map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerEndpoint);
    map.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
    map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(map);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListner() {

    ConcurrentKafkaListenerContainerFactory<String, String> obj =
        new ConcurrentKafkaListenerContainerFactory<>();
    obj.setConsumerFactory(consumerFactory());
    return obj;
  }

}

Add Kafka Method Listener

In your project main Application class, create a Kafka listener method. This method should accept a single String parameter and be annotated with the @KafkaListener(topics = "kafka.topic_name", groupId = "kafka.group_id") annotation. The @KafkaListener annotation designates the method as a Kafka message listener for the specified topics:

package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;

@SpringBootApplication
public class ExampleKafkaApplication {
  private static final Logger LOGGER = LoggerFactory.getLogger(ExampleKafkaApplication.class);

  public static void main(String[] args) {
    SpringApplication.run(ExampleKafkaApplication.class, args);
  }

  @KafkaListener(topics = "${kafka.topic_name}", groupId = "${kafka.group_id}")
  public void listener(String message) {
    LOGGER.info("Received message : {}", message);
    // Implement your logic here
  }

}

Build and Run your Spring Boot Application

Test your consumer application by sending data to the Kafka topic. You'll notice messages printing in the console as soon as there are data in the Kafka topic.