Write Data to a Kafka Topic using Confluent Kafka in Python

In this tutorial, you will learn how to write data to a Kafka topic in Python.

To send data to a Kafka topic, we will use Confluent Kafka library which is one of the best Python client libraries for Apache Kafka. It provides a high level Producer, Consumer, and AdminClient.

An application that writes data to a Kafka topic is called a Producer Application.

You can also learn how to read data from a Kafka topic here.

Follow the steps below to install Confluent Kafka library and create a producer application to send data to a Kafka topic in Python:

Install Confluent Kafka

Install the latest version of Python Confluent Kafka Library:


pip install confluent-kafka

Producer

Producer code to send data to a Kafka topic:


from confluent_kafka import Producer

class ExampleProducer:
    broker = "localhost:9092"
    topic = "test-topic"
    producer = None

    def __init__(self):
        self.producer = Producer({
            'bootstrap.servers': self.broker,
            'socket.timeout.ms': 100,
            'api.version.request': 'false',
            'broker.version.fallback': '0.9.0',
        }
        )

    def delivery_report(self, err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(
                msg.topic(), msg.partition()))

    def send_msg_async(self, msg):
        print("Send message asynchronously")
        self.producer.produce(
            self.topic,
            msg,
            callback=lambda err, original_msg=msg: self.delivery_report(err, original_msg
                                                                        ),
        )
        self.producer.flush()

    def send_msg_sync(self, msg):
        print("Send message synchronously")
        self.producer.produce(
            self.topic,
            msg,
            callback=lambda err, original_msg=msg: self.delivery_report(
                err, original_msg
            ),
        )
        self.producer.flush()


#SENDING DATA TO KAFKA TOPIC
example_producer = ExampleProducer()
message = "Hello this message will be sent to the kafka topic."
example_producer.send_msg_async(message)

Kafka Producer application is ready. Next, run the Kafka Services and create a topic test-topic on your local machine.

If you do not have Apache Kafka server already setup on your local machine then learn how to do it here for Windows and here for Mac/Ubuntu/Linux.