Writing Data to a Kafka Topic in Java Spring Boot Example

In this example, we will build a sample Kafka Producer application in Java Spring Boot to send data to a Kafka topic via a REST API.

An application that is used to send data to one or more topics in the Kafka clusters is called a Producer application.

Kafka Dependency for Spring Boot

For Maven


    

For Gradle


    implementation 'org.springframework.kafka:spring-kafka'

Find the other versions of Spring Kafka in the Maven Repository.

Follow the steps below to complete this example:

Create a Spring Boot Application

  1. Go to Spring Initializr at https://start.spring.io and create a Spring Boot application with details as follows:
    • Project: Choose Gradle Project or Maven Project.
    • Language: Java
    • Spring Boot: Latest stable version of Spring Boot is selected by default. So leave it as is.
    • Project Metadata: Provide group name in the Group field. The group name is the id of the project. In Artifact field, provide the name of your project. In the package field, provide package name for your project. Next, select your preferred version of Java that is installed on your computer and is available on your local environment.
    • Dependencies: Add dependencies for Spring Web, Spring Boot DevTools, and Spring for Apache Kafka.

    Refer to the image below for example:

    Write data to Kafka topic Java Spring Boot example
  2. Click the GENERATE button and save/download the project zip bundle.
  3. Extract the project to your preferred working directory.
  4. Import the project in your preferred Java development IDE such as Eclipse or IntelliJ IDEA.

Add Application Configurations

Open the application.properties file and add the following configuration to it. Do not forget to replace the configuration values that is relevant to your project.

kafka-producer-sample/src/main/resources/application.properties

    server.port = 8080

    kafka.server_endpoint = 127.0.0.1:9092
    kafka.topic_name = my-topic
    

Create a Data Transfer Object Class

Create a Message.java Java class. This class should only contains getter/setter methods with serialization and deserialization mechanism but should not contain any business logic.

kafka-producer-sample/src/main/java/com/example/kafka/producer/dto/Message.java

    package com.example.kafka.producer.dto;

public class Message {

    private String content;
    private String senderId;

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public String getSenderId() {
        return senderId;
    }

    public void setSenderId(String senderId) {
        this.senderId = senderId;
    }

}
    

Create Producer Configuration Class

Create a KafkaProducerConfig.java Java class and annotate it with @Configuration annotation. This configuration class should contain methods to establish connection between the Kafka Producer application and the Kafka Server.

kafka-producer-sample/src/main/java/com/example/kafka/producer/config/KafkaProducerConfig.java

    package com.example.kafka.producer.config;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.example.kafka.producer.dto.Message;

@Configuration
public class KafkaProducerConfig {
    
    @Value(value = "${kafka.server_endpoint}")
    private String kafkaServerEndpoint;

    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerEndpoint);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Message> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
    

Create a Web Controller

Create a KafkaProducerController.java java class with a POST method to send data to the Kafka topic.

kafka-producer-sample/src/main/java/com/example/kafka/producer/controller/KafkaProducerController.java

    package com.example.kafka.producer.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.kafka.producer.dto.Message;

@RestController
@RequestMapping(path = "/producer")
public class KafkaProducerController {

    private static final Logger logger = LoggerFactory.getLogger(KafkaProducerController.class);

    @Autowired
    private KafkaTemplate<String, Message> kafkaTemplate;

    @Value(value = "${kafka.topic_name}")
    private String kafkaTopicName;

    String status = "";

    @PostMapping(path = "/messages")
    public ResponseEntity<String> sendMessage(@RequestBody  Message message) {
        logger.info("sending message {}", message);

        ListenableFuture<SendResult<String, Message>> future =
                this.kafkaTemplate.send(kafkaTopicName, message);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {

            @Override
            public void onSuccess(SendResult<String, Message> result) {
                status = "Message sent successfully";
                logger.info("successfully sent message = {}, with offset = {}", message,
                        result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                logger.info("Failed to send message = {}, error = {}", message, ex.getMessage());
                status = "Message sending failed";
            }
        });
        return ResponseEntity.ok(status);
    }

}

Run and Test Your Application

Note: Your Kafka Server should be running to be able to receive and store messages in a Kafka topic.

Run your Producer application and after it starts successfully, send JSON POST requests to http://localhost:8080/producer/messagesusing any HTTP requester tool.

Send this JSON data in the POST request's body:


    {
    "content": "This is example 1",
    "id": 1
    }
    

When your Producer application receives this message, it will send this data to the Kafka topic that you've specified in your application.properties file.

Next, we recommend you to build a Consumer application that can read data from a Kafka topic.

Summary

Congratulations! you have learned how to build a Kafka Producer application.