Write Data to a Kafka Topic using Confluent Kafka in Python

In this tutorial, you will learn how to write data to a Kafka topic in Python.

To send data to a Kafka topic, we will use Confluent Kafka which is one of the best Python client libraries for Apache Kafka. It provides a high level Producer, Consumer, and AdminClient.

An application that writes data to a Kafka topic is called a Producer Application.

The steps below shows how to install the Confluent Kafka library and write code for a Kafka producer application in Python:

Install Confluent Kafka

Install the latest version of Python Confluent Kafka Library:


pip install confluent-kafka

Producer

Producer code to send data to a Kafka topic:


from confluent_kafka import Producer

class ExampleProducer:
    broker = "localhost:9092"
    topic = "test-topic"
    producer = None

    def __init__(self):
        self.producer = Producer({
            'bootstrap.servers': self.broker,
            'socket.timeout.ms': 100,
            'api.version.request': 'false',
            'broker.version.fallback': '0.9.0',
        }
        )

    def delivery_report(self, err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(
                msg.topic(), msg.partition()))

    def send_msg_async(self, msg):
        print("Send message asynchronously")
        self.producer.produce(
            self.topic,
            msg,
            callback=lambda err, original_msg=msg: self.delivery_report(err, original_msg
                                                                        ),
        )
        self.producer.flush()

    def send_msg_sync(self, msg):
        print("Send message synchronously")
        self.producer.produce(
            self.topic,
            msg,
            callback=lambda err, original_msg=msg: self.delivery_report(
                err, original_msg
            ),
        )
        self.producer.flush()


#SENDING DATA TO KAFKA TOPIC
example_producer = ExampleProducer()
message = "Hello this message will be sent to the kafka topic."
example_producer.send_msg_async(message)

Kafka Producer application is ready. Next, run the Kafka Services and create a topic test-topic on your local machine.

If you do not have Kafka server already setup on your local machine then see how to do it here for Windows and here for Mac/Ubuntu/Linux.