Write Data to Amazon Kinesis Data Streams Using KPL in Java Spring Boot Example

This tutorial is about how to write data to Amazon Kinesis Data Streams in Java Spring Boot. To achieve this, we need a producer application. A producer application is used to write data into Kinesis Data Streams.

So, in this tutorial, we will build a producer application using KPL (Kinesis Producer Library). The KPL plays a role of an interface between a producer application and the Kinesis Data Stream API. The advantage of using KPL is that it simplifies the development of the producer application and provides a configurable data writing and automatic retry mechanism that is capable of writing multiple records to multiple shards (A unique sequence of data records.) in each request using functions like PutRecords.

Follow the steps below to build this Kinesis sample Producer application:

Create a Spring Boot Application

  1. Go to Spring Initializr at https://start.spring.io and create a Spring Boot application with details as follows:
    • Project: Choose Gradle Project or Maven Project.
    • Language: Java
    • Spring Boot: Latest stable version of Spring Boot is selected by default. So leave it as is.
    • Project Metadata: Provide group name in the Group field. The group name is the id of the project. In Artifact field, provide the name of your project. In the package field, provide package name for your project. Next, select your preferred version of Java that is installed on your computer and is available on your local environment.
    • Dependencies: Add dependencies for Spring Web, and Spring Boot DevTools.

    Refer to the image below for example:

    Kinesis Producer Application setup
  2. Click the GENERATE button and save/download the project zip bundle.
  3. Extract the project to your preferred working directory.
  4. Import the project to your preferred Java development IDE such as Eclipse or IntelliJ IDEA.

The final project structure of our sample application will look something like this after completion in a hierarchical package presentation view:

Amazon Kinesis Java project structure

Add Dependency

Add KPL dependency to the project. Find the latest version of the KPL in the Maven Repository.

For Gradle

Open the default build.gradle file and add the gradle dependency for KPL:

kinesis-producer-sample/build.gradle

   implementation 'com.amazonaws:amazon-kinesis-producer:0.14.1'

For Maven

Add the following dependency of KPL to pom.xml file:

kinesis-producer-sample/pom.xml

    

Configure Kinesis Stream Data (AWS Console)

  1. Login to the AWS Management Console and open the Amazon Kinesis Streams Management Console at https://console.aws.amazon.com/kinesis.
  2. Choose Kinesis Data Streams and click on Create data stream as shown in the image below:
  3. Create Amazon Kinesis on AWS Kinesis Console.
  4. On the Create Data Stream page, do the following:
    • Type your Kinesis data stream name in the Data stream name field. For our example - tutorialsbuddy-stream.
    • Type number in the Number of open shards field. For our example - 1. The more shards we use, the more price we need to pay. For example, see in the image below:
    • Create kinesis data stream configuration
    • Choose Create data stream

Add Application Configurations

Open the application.properties file and add the following configuration to it. Do not forget to replace the configuration values that is relevant to your project.

kinesis-producer-sample/src/main/resources/application.properties

    #port on which the application would run
server.port = 8080

#use your aws credentials here
aws.access_key = IHJKSEKANSKM3DTA
aws.secret_key = KLdLJdfKGdkwWFRfsFSvNw8WEfpOmGDkS
aws.region = us-east-1

#use your stream name that you have created
aws.stream_name = tutorialsbuddy-stream
    

Create Producer Service Interface

Create ProducerService.java Java interface with two methods:

  1. public void putDataIntoKinesis(String payload) - takes string data as an input and sends it to Kinesis data stream.
  2. public void stop() - stops the sending of data to Kinesis data stream.
kinesis-producer-sample/src/main/java/com/sample/kinesis/producer/service/ProducerService.java

    package com.sample.kinesis.producer.service;

public interface ProducerService {

  public void putDataIntoKinesis(String payload) throws Exception;
  public void stop();

}
    

Create Service Implementation Class

Create ProducerServiceImpl.java class and implement the methods of ProducerService inferface.

kinesis-producer-sample/src/main/java/com/sample/kinesis/producer/service/ProducerServiceImpl.java

    package com.sample.kinesis.producer.service;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

@Service
public class ProducerServiceImpl implements ProducerService {

    private static final Logger LOG = LoggerFactory.getLogger(ProducerServiceImpl.class);

    @Value(value = "${aws.stream_name}")
    private String streamName;

    @Value(value = "${aws.region}")
    private String awsRegion;
 
    @Value(value = "${aws.access_key}")
    private String awsAccessKey;

    @Value(value = "${aws.secret_key}")
    private String awsSecretKey;

    private KinesisProducer kinesisProducer = null;


    // The number of records that have finished (either successfully put, or failed)
    final AtomicLong completed = new AtomicLong(0);


    private static final String TIMESTAMP_AS_PARTITION_KEY =
            Long.toString(System.currentTimeMillis());


    public ProducerServiceImpl() {
        this.kinesisProducer = getKinesisProducer();
    }

    private KinesisProducer getKinesisProducer() {
        if (kinesisProducer == null) {

            BasicAWSCredentials awsCreds = new BasicAWSCredentials(awsAccessKey, awsSecretKey);

            KinesisProducerConfiguration config = new KinesisProducerConfiguration();
            config.setRegion(awsRegion);
            config.setCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds));
            config.setMaxConnections(1);
            config.setRequestTimeout(6000); // 6 seconds
            config.setRecordMaxBufferedTime(5000); // 5 seconds

            kinesisProducer = new KinesisProducer(config);
        }

        return kinesisProducer;
    }

    @Override
    public void putDataIntoKinesis(String payload) throws Exception {

        FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {

            @Override
            public void onFailure(Throwable t) {

                // If we see any failures, we will log them.
                int attempts = ((UserRecordFailedException) t).getResult().getAttempts().size() - 1;
                if (t instanceof UserRecordFailedException) {
                    Attempt last =
                            ((UserRecordFailedException) t).getResult().getAttempts().get(attempts);
                    if (attempts > 1) {
                        Attempt previous = ((UserRecordFailedException) t).getResult().getAttempts()
                                .get(attempts - 1);
                        LOG.error(String.format(
                                "Failed to put record - %s : %s. Previous failure - %s : %s",
                                last.getErrorCode(), last.getErrorMessage(),
                                previous.getErrorCode(), previous.getErrorMessage()));
                    } else {
                        LOG.error(String.format("Failed to put record - %s : %s.",
                                last.getErrorCode(), last.getErrorMessage()));
                    }

                }
                LOG.error("Exception during put", t);
            }

            @Override
            public void onSuccess(UserRecordResult result) {

                long totalTime = result.getAttempts().stream()
                        .mapToLong(a -> a.getDelay() + a.getDuration()).sum();

                LOG.info("Data writing success. Total time taken to write data = {}", totalTime);

                completed.getAndIncrement();
            }
        };


        final ExecutorService callbackThreadPool = Executors.newCachedThreadPool();

        ByteBuffer data = null;

        try {
            data = ByteBuffer.wrap(payload.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        // wait until unfinished records are processed
        while (kinesisProducer.getOutstandingRecordsCount() > 1e4) {
            Thread.sleep(1);
        }

        // write data to Kinesis stream
        ListenableFuture<UserRecordResult> f =
                kinesisProducer.addUserRecord(streamName, TIMESTAMP_AS_PARTITION_KEY, data);

        Futures.addCallback(f, myCallback, callbackThreadPool);

    }

    @Override
    public void stop() {
        if (kinesisProducer != null) {
            kinesisProducer.flushSync();
            kinesisProducer.destroy();
        }

    }

}
    

Create a Model Class

Create TrackDetail.java Java class with getter and setter methods. We will use this class to transport data from client end to server end.

kinesis-producer-sample/src/main/java/com/sample/kinesis/producer/model/TrackDetail.java

package com.sample.kinesis.producer.model;

public class TrackDetail {
  private String vehicleId;
  private String driverId;
  private String driverName;
   
  public String getVehicleId() {
    return vehicleId;
  }
 
  public void setVehicleId(String vehicleId) {
    this.vehicleId = vehicleId;
  }
 
  public String getDriverId() {
    return driverId;
  }
 
  public void setDriverId(String driverId) {
    this.driverId = driverId;
  }
 
  public String getDriverName() {
    return driverName;
  }
 
  public void setDriverName(String driverName) {
    this.driverName = driverName;
  }
 
}
    

Add a Producer Controller

Create ProducerController.java Java class and add the following code to it. We will annotate this class with @RestController to make it a REST controller and @RequestMapping(value = "/api") to map requests to path /api. Inside this controller, we will create a POST method having path /stream to upload data to Kinesis data stream.

kinesis-producer-sample/src/main/java/com/sample/kinesis/producer/controller/ProducerController.java

    package com.sample.kinesis.producer.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sample.kinesis.producer.service.ProducerService;
import com.sample.kinesis.producer.model.TrackDetail;
   
@RestController
@RequestMapping(value = "/api")
public class ProducerController {
   @Autowired
   private ProducerService producerService;

   @PostMapping(value = "/stream")
   public ResponseEntity<String> putIntoKinesis(@RequestBody  TrackDetail 
     trackDetail) {
        ObjectMapper mapper = new ObjectMapper();
        String data = "";
        try {
             data = mapper.writeValueAsString(trackDetail);
             producerService.putDataIntoKinesis(data);
        } catch (JsonProcessingException e) {
           e.printStackTrace();
        } catch (Exception e) {
           e.printStackTrace();
        }
        return ResponseEntity.ok("Saved data into Kinessis sucessfully!");
   }
  
}
    

Run and Test the Kinesis Producer Application

Run the Producer application and call the POST method using Postman or any other similar HTTP Requester tool. See example in the image below:

Summary

Congratulations! you have learned how to create a Producer application to send data to a Kinesis Data Stream. Next, we recommend you to learn to read data from a Kinesis Data Stream using a Consumer application. See Consumer example here .