Send Data to Amazon MSK Topic from AWS Lambda Python Example
In this tutorial, we will learn how to send data to MSK topic from a AWS Lambda function using confluent-kafka Python library.
Follow the steps below to send data to Amazon MSK topic from AWS Lambda in Python:
Create AWS Lambda Function
Go to AWS Lambda console and do the following to create a Kafka Producer Lambda function:
- We will create a function from scratch, so let the function option be Author from scratch.
- Enter the function name of your choice in the Function name field. I am entering myProducer for this example.
- Select the Python version for Runtime. I am selecting Python3.8 for this example.
- Leave the Execution Role as it is.
- Choose Create function button.
Add Kakfa Python Library to Lambda
This example uses confluent-kafka Python Kafka client library to send data to the Amazon MSK topic. To add this library to your Lambda Function, do the following:
- Open your terminal and create a folder and navigate inside that folder you just created:
- Inside the folder, create a requirements.txt file and add the following dependency for confluent-kafka library:
- Run the following command to install library compatible with your runtime in the appropriate sub folder (Replace python3.8 with the version of your Python):
- Zip the python folder and create the layer package by running the following command:
- Go to the AWS Lambda console > Layers. Enter the details, upload the zip file you created with and click on the Create button. Refer to the image below for example:
- Go to your Lambda Function and scroll down until you see the option to Add a Layer as shown in the example image below:
- On the Add layer page, select Custom layers for Layer source. Select the layer you created from the dropdown list for Custom layers. Finally select the version and choose the Add button. Refer to the image below for example:
mkdir mykafkalib
cd mykafkalib
confluent-kafka==1.7.0
Note: You must have docker installed on your computer to run the command given below:
docker run -v "$PWD":/var/task "lambci/lambda:build-python3.8" /bin/sh -c "pip3.8 install -r requirements.txt -t python/lib/python3.8/site-packages/; exit"
zip -r mykafkalib.zip python > /dev/null



Update Lambda Function Code
Update your lambda_function.py file with the following code to send data to the MSK topic:
import json
import time
from time import time
from confluent_kafka import Producer
KAFKA_BROKER = 'your-kafka-broker-endpoint-1.amazonaws.com:9092'
KAFKA_TOPIC = 'your_topic_name'
producer = Producer({
'bootstrap.servers': KAFKA_BROKER,
'socket.timeout.ms': 100,
'api.version.request': 'false',
'broker.version.fallback': '0.9.0',
'message.max.bytes': 1000000000
})
def lambda_handler(event, context):
print("event = ",event)
start_time = int(time() * 1000)
#This is test data, you can get data from the event
data = {"name": "xyz", "email": "xyz@"}
send_msg_async(data)
end_time = int(time() * 1000)
time_taken = (end_time - start_time) / 1000
print("Time taken to complete = %s seconds" % (time_taken))
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(
msg.topic(), msg.partition()))
def send_msg_async(msg):
print("Sending message")
try:
msg_json_str = str({"data": json.dumps(msg)})
producer.produce(
KAFKA_TOPIC,
msg_json_str,
callback=lambda err, original_msg=msg_json_str: delivery_report(err, original_msg
),
)
producer.flush()
except Exception as ex:
print("Error : ", ex)
Finally, click on the Deploy button to deploy the changes to this function. The Lambda Function is ready to send messages to MSK topic.