Reading Data from a Kafka Topic in Java Spring Boot Example

In this example, we will build a sample Kafka Consumer application in Java Spring Boot to read data from a Kafka topic.

An application that is used to read/consume streams of data from one or more Kafka topics is called a Consumer application.

Kafka Dependency for Spring Boot

For Maven


    

For Gradle


    implementation 'org.springframework.kafka:spring-kafka'

Find the other versions of Spring Kafka in the Maven Repository.

Follow the steps below to complete this example:

Create a Spring Boot Application

  1. Go to Spring Initializr at https://start.spring.io and create a Spring Boot application with details as follows:
    • Project: Choose Gradle Project or Maven Project.
    • Language: Java
    • Spring Boot: Latest stable version of Spring Boot is selected by default. So leave it as is.
    • Project Metadata: Provide group name in the Group field. The group name is the id of the project. In Artifact field, provide the name of your project. In the package field, provide package name for your project. Next, select your preferred version of Java that is installed on your computer and is available on your local environment.
    • Dependencies: Add dependencies for Spring Web, Spring Boot DevTools, and Spring for Apache Kafka.

    Refer to the image below for example:

    Read data from Kafka topic Java Spring Boot example
  2. Click the GENERATE button and save/download the project zip bundle.
  3. Extract the project to your preferred working directory.
  4. Import the project in your preferred Java development IDE such as Eclipse or IntelliJ IDEA.

Add Application Configurations

Open the application.properties file and add the following configuration to it. Do not forget to replace the configuration values that is relevant to your project.

kafka-consumer-sample/src/main/resources/application.properties

    server.port = 8081

kafka.server_endpoint = 127.0.0.1:9092
kafka.topic_name = my-topic
kafka.group_id = consumer-1
    

Creating Consumer Configuration Class

Create a KafkaConsumerConfig.java Java class and annotate it with @EnableKafka and @Configuration annotations. This configuration class should contain methods to establish connection between the Kafka Consumer application and the Kafka Server.

kafka-consumer-sample/src/main/java/com/example/kafka/consumer/config/KafkaConsumerConfig.java

    package com.example.kafka.consumer.config;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.server_endpoint}")
    private String kafkaServerEndpoint;

    @Value(value = "${kafka.group_id}")
    private String kafkaGroupId;

   
    @Bean
    public ConsumerFactory consumerFactory() {

        Map map = new HashMap<>();

        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerEndpoint);
        map.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(map);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListner() {

        ConcurrentKafkaListenerContainerFactory obj =
                new ConcurrentKafkaListenerContainerFactory<>();
        obj.setConsumerFactory(consumerFactory());
        return obj;
    }
}

Add Kafka Method Listener

Go to your main Application class and create a Kafka listener method. The listener method should have a single String parameter and be annotated by @KafkaListener(topics = "topic-name", groupId = "group-id") annotation. This @KafkaListener annotation marks a method to be the target of a Kafka message listener on the specified topics.

kafka-consumer-sample/src/main/java/com/example/kafka/consumer/KafkaConsumerSampleApplication.java

    package com.example.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;

@SpringBootApplication
public class KafkaConsumerSampleApplication {

    private static final Logger logger =
            LoggerFactory.getLogger(KafkaConsumerSampleApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerSampleApplication.class, args);
    }


    @KafkaListener(topics = "${kafka.topic_name}", groupId = "${kafka.group_id}")
    public void listener(String message) {
        logger.info("Received message = {}", message);
    }
}

Here, the Kafka listener method is invoked every time there is a new message in the specified Kafka topic. Inside this method, you can parse the received string data and persist them in a database or do other processing for other use.

Run and Test the Application

  1. Start your Kafka server.
  2. Check the topic that you have specified in the consumer application exists on your Kafka server. If it doesn't exist, then create one.
  3. Run your Consumer application.
  4. Send data to the specified Kafka topic either using a Producer application or a console Producer.
  5. You should see data in the Consumer's console as soon as there is a new data in the specified topic.

Summary

Congratulations! you have learned how to build a Kafka Consumer application in Java Spring Boot.