Send Data to Amazon MSK Topic from AWS Lambda Python Example

  • Last updated Apr 25, 2024

Amazon Managed Apache Kafka (Amazon MSK) is a powerful service that simplifies the management of Apache Kafka clusters, making it an excellent choice for real-time data streaming. AWS Lambda, on the other hand, is a serverless computing service that allows to run code without having to worry about managing servers. Using both of these services together, opens up a world of possibilities for efficient, event-driven data processing.

Here are the steps to send data to an Amazon MSK topic from AWS Lambda using Python:

STEP 1: Create AWS Lambda Function

Go to AWS Lambda console and do the following to create a Kafka Producer Lambda function:

  1. Go to Lambda console at https://console.aws.amazon.com/lambda/home. Click the Create function button.
  2. Choose Author from scratch.
  3. Enter the function name of your choice in the Function name field.
  4. Choose Python3.10 or your preferred version as your Runtime.
  5. For Execution Role, leave it as it is.
  6. Choose Create Function.
STEP 2: Add Kafka Python Library to AWS Lambda

This example uses the confluent-kafka Python Kafka client library as a dependency to send data to the Amazon MSK topic.

To add this library to your Lambda Function, do the following:

  1. Open a terminal and create a folder:
  2. mkdir mykafkalib
  3. Navigate inside the folder you just created:
  4. cd mykafkalib
  5. Create a requirements.txt file inside that folder and add confluent-kafka library. For Example:
  6. confluent-kafka==2.2.0
  7. Run the following commands to install libraries compatible with your runtime in the appropriate sub folder (Replace python3.10 with the version of your Python):
  8. Note: You must have docker installed on your computer to run this command.

    Use the commands with sudo if you get permission error.

    docker run -v "$PWD":/var/task "lambci/lambda:build-python3.10" /bin/sh -c "pip3.10 install -r requirements.txt -t python/lib/python3.10/site-packages/; exit"
  9. Zip the python folder and create the layer package by running the following command:
  10. zip -r mykafkalib.zip python > /dev/null
  11. Go to the AWS Lambda console > Layers. Enter the details, upload the zip file you created with and click the Create button.
  12. Go to your Lambda Function and scroll down until you see the option to Add a Layer.
  13. On the Add layer page, select Custom layers for Layer source. Select the layer you created from the dropdown list for Custom layers. Finally select the version and choose the Add button.
STEP 3: Update Lambda Function Code

Update your lambda_function.py file with the following code to send data to the MSK topic:

import json
import time
from time import time
from confluent_kafka import Producer


KAFKA_BROKER = 'your-kafka-broker-endpoint-1.amazonaws.com:9092'
KAFKA_TOPIC = 'your_topic_name'

producer = Producer({
    'bootstrap.servers': KAFKA_BROKER,
    'socket.timeout.ms': 100,
    'api.version.request': 'false',
    'broker.version.fallback': '0.9.0',
    'message.max.bytes': 1000000000
})


def lambda_handler(event, context):
    print("event = ",event)
    start_time = int(time() * 1000)
    #This is test data, you can get data from the event
    data = {"name": "xyz", "email": "xyz@"}

    send_msg_async(data)

    end_time = int(time() * 1000)
    time_taken = (end_time - start_time) / 1000
    print("Time taken to complete = %s seconds" % (time_taken))


def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(
            msg.topic(), msg.partition()))


def send_msg_async(msg):
    print("Sending message")
    try:
        msg_json_str = str({"data": json.dumps(msg)})
        producer.produce(
            KAFKA_TOPIC,
            msg_json_str,
            callback=lambda err, original_msg=msg_json_str: delivery_report(err, original_msg
                                                                            ),
        )
        producer.flush()
    except Exception as ex:
        print("Error : ", ex)

Click the Deploy button to save the changes of this function. The Lambda Function is ready to send messages to MSK topic.