Write Data to Amazon Kinesis Data Streams Using KPL in Java Spring Boot Example
This tutorial is about how to write data to Amazon Kinesis Data Streams in Java Spring Boot. To achieve this, we need a producer application. A producer application is used to write data into Kinesis Data Streams.
So, in this tutorial, we will build a producer application using KPL (Kinesis Producer Library). The KPL plays a role of an interface between a producer application and the Kinesis Data Stream API. The advantage of using KPL is that it simplifies the development of the producer application and provides a configurable data writing and automatic retry mechanism that is capable of writing multiple records to multiple shards (A unique sequence of data records.) in each request using functions like PutRecords.
Follow the steps below to build this Kinesis sample Producer application:
Create a Spring Boot Application
- Go to Spring Initializr at https://start.spring.io and create a Spring Boot application with details as follows:
- Project: Choose Gradle Project or Maven Project.
- Language: Java
- Spring Boot: Latest stable version of Spring Boot is selected by default. So leave it as is.
- Project Metadata: Provide group name in the Group field. The group name is the id of the project. In Artifact field, provide the name of your project. In the package field, provide package name for your project. Next, select your preferred version of Java that is installed on your computer and is available on your local environment.
- Dependencies: Add dependencies for Spring Web, and Spring Boot DevTools.
- Click the GENERATE button and save/download the project zip bundle.
- Extract the project to your preferred working directory.
- Import the project to your preferred Java development IDE such as Eclipse or IntelliJ IDEA.
Refer to the image below for example:

The final project structure of our sample application will look something like this after completion in a hierarchical package presentation view:

Add Dependency
Add KPL dependency to the project. Find the latest version of the KPL in the Maven Repository.
For Gradle
Open the default build.gradle file and add the gradle dependency for KPL:
implementation 'com.amazonaws:amazon-kinesis-producer:0.14.1'
For Maven
Add the following dependency of KPL to pom.xml file:
Configure Kinesis Stream Data (AWS Console)
- Login to the AWS Management Console and open the Amazon Kinesis Streams Management Console at https://console.aws.amazon.com/kinesis.
- Choose Kinesis Data Streams and click on Create data stream as shown in the image below:
- On the Create Data Stream page, do the following:

Add Application Configurations
Open the application.properties file and add the following configuration to it. Do not forget to replace the configuration values that is relevant to your project.
#port on which the application would run
server.port = 8080
#use your aws credentials here
aws.access_key = IHJKSEKANSKM3DTA
aws.secret_key = KLdLJdfKGdkwWFRfsFSvNw8WEfpOmGDkS
aws.region = us-east-1
#use your stream name that you have created
aws.stream_name = tutorialsbuddy-stream
Create Producer Service Interface
Create ProducerService.java Java interface with two methods:
- public void putDataIntoKinesis(String payload) - takes string data as an input and sends it to Kinesis data stream.
- public void stop() - stops the sending of data to Kinesis data stream.
package com.sample.kinesis.producer.service;
public interface ProducerService {
public void putDataIntoKinesis(String payload) throws Exception;
public void stop();
}
Create Service Implementation Class
Create ProducerServiceImpl.java class and implement the methods of ProducerService inferface.
package com.sample.kinesis.producer.service;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@Service
public class ProducerServiceImpl implements ProducerService {
private static final Logger LOG = LoggerFactory.getLogger(ProducerServiceImpl.class);
@Value(value = "${aws.stream_name}")
private String streamName;
@Value(value = "${aws.region}")
private String awsRegion;
@Value(value = "${aws.access_key}")
private String awsAccessKey;
@Value(value = "${aws.secret_key}")
private String awsSecretKey;
private KinesisProducer kinesisProducer = null;
// The number of records that have finished (either successfully put, or failed)
final AtomicLong completed = new AtomicLong(0);
private static final String TIMESTAMP_AS_PARTITION_KEY =
Long.toString(System.currentTimeMillis());
public ProducerServiceImpl() {
this.kinesisProducer = getKinesisProducer();
}
private KinesisProducer getKinesisProducer() {
if (kinesisProducer == null) {
BasicAWSCredentials awsCreds = new BasicAWSCredentials(awsAccessKey, awsSecretKey);
KinesisProducerConfiguration config = new KinesisProducerConfiguration();
config.setRegion(awsRegion);
config.setCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds));
config.setMaxConnections(1);
config.setRequestTimeout(6000); // 6 seconds
config.setRecordMaxBufferedTime(5000); // 5 seconds
kinesisProducer = new KinesisProducer(config);
}
return kinesisProducer;
}
@Override
public void putDataIntoKinesis(String payload) throws Exception {
FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {
@Override
public void onFailure(Throwable t) {
// If we see any failures, we will log them.
int attempts = ((UserRecordFailedException) t).getResult().getAttempts().size() - 1;
if (t instanceof UserRecordFailedException) {
Attempt last =
((UserRecordFailedException) t).getResult().getAttempts().get(attempts);
if (attempts > 1) {
Attempt previous = ((UserRecordFailedException) t).getResult().getAttempts()
.get(attempts - 1);
LOG.error(String.format(
"Failed to put record - %s : %s. Previous failure - %s : %s",
last.getErrorCode(), last.getErrorMessage(),
previous.getErrorCode(), previous.getErrorMessage()));
} else {
LOG.error(String.format("Failed to put record - %s : %s.",
last.getErrorCode(), last.getErrorMessage()));
}
}
LOG.error("Exception during put", t);
}
@Override
public void onSuccess(UserRecordResult result) {
long totalTime = result.getAttempts().stream()
.mapToLong(a -> a.getDelay() + a.getDuration()).sum();
LOG.info("Data writing success. Total time taken to write data = {}", totalTime);
completed.getAndIncrement();
}
};
final ExecutorService callbackThreadPool = Executors.newCachedThreadPool();
ByteBuffer data = null;
try {
data = ByteBuffer.wrap(payload.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
// wait until unfinished records are processed
while (kinesisProducer.getOutstandingRecordsCount() > 1e4) {
Thread.sleep(1);
}
// write data to Kinesis stream
ListenableFuture<UserRecordResult> f =
kinesisProducer.addUserRecord(streamName, TIMESTAMP_AS_PARTITION_KEY, data);
Futures.addCallback(f, myCallback, callbackThreadPool);
}
@Override
public void stop() {
if (kinesisProducer != null) {
kinesisProducer.flushSync();
kinesisProducer.destroy();
}
}
}
Create a Model Class
Create TrackDetail.java Java class with getter and setter methods. We will use this class to transport data from client end to server end.
package com.sample.kinesis.producer.model;
public class TrackDetail {
private String vehicleId;
private String driverId;
private String driverName;
public String getVehicleId() {
return vehicleId;
}
public void setVehicleId(String vehicleId) {
this.vehicleId = vehicleId;
}
public String getDriverId() {
return driverId;
}
public void setDriverId(String driverId) {
this.driverId = driverId;
}
public String getDriverName() {
return driverName;
}
public void setDriverName(String driverName) {
this.driverName = driverName;
}
}
Add a Producer Controller
Create ProducerController.java Java class and add the following code to it. We will annotate this class with @RestController to make it a REST controller and @RequestMapping(value = "/api") to map requests to path /api. Inside this controller, we will create a POST method having path /stream to upload data to Kinesis data stream.
package com.sample.kinesis.producer.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sample.kinesis.producer.service.ProducerService;
import com.sample.kinesis.producer.model.TrackDetail;
@RestController
@RequestMapping(value = "/api")
public class ProducerController {
@Autowired
private ProducerService producerService;
@PostMapping(value = "/stream")
public ResponseEntity<String> putIntoKinesis(@RequestBody TrackDetail
trackDetail) {
ObjectMapper mapper = new ObjectMapper();
String data = "";
try {
data = mapper.writeValueAsString(trackDetail);
producerService.putDataIntoKinesis(data);
} catch (JsonProcessingException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return ResponseEntity.ok("Saved data into Kinessis sucessfully!");
}
}
Run and Test the Kinesis Producer Application
Run the Producer application and call the POST method using Postman or any other similar HTTP Requester tool. See example in the image below:

Summary
Congratulations! you have learned how to create a Producer application to send data to a Kinesis Data Stream. Next, we recommend you to learn to read data from a Kinesis Data Stream using a Consumer application. See Consumer example here .