Write Data to a Kafka Topic using Confluent Kafka in Python

The Kafka Producer is responsible for sending messages to a Kafka topic.

This tutorial will guide you through the process of creating a Kafka Producer in Python using the Confluent Kafka library to write data to a Kafka topic.

You can also learn how to read data from a Kafka topic here.

Follow these steps to install Confluent Kafka library and create a producer application to send data to a Kafka topic in Python:

  1. Installing Confluent Kafka:
  2. Install the latest version of Python Confluent Kafka Library:

    pip install confluent-kafka
  3. Creating Producer:
  4. Here's an example Python code of a Kafka Producer implemented using the confluent-kafka library:

    from confluent_kafka import Producer
    
    class ExampleProducer:
        broker = "localhost:9092"
        topic = "test-topic"
        producer = None
    
        def __init__(self):
            self.producer = Producer({
                'bootstrap.servers': self.broker,
                'socket.timeout.ms': 100,
                'api.version.request': 'false',
                'broker.version.fallback': '0.9.0',
            }
            )
    
        def delivery_report(self, err, msg):
            """ Called once for each message produced to indicate delivery result.
                Triggered by poll() or flush(). """
            if err is not None:
                print('Message delivery failed: {}'.format(err))
            else:
                print('Message delivered to {} [{}]'.format(
                    msg.topic(), msg.partition()))
    
        def send_msg_async(self, msg):
            print("Send message asynchronously")
            self.producer.produce(
                self.topic,
                msg,
                callback=lambda err, original_msg=msg: self.delivery_report(err, original_msg
                                                                            ),
            )
            self.producer.flush()
    
        def send_msg_sync(self, msg):
            print("Send message synchronously")
            self.producer.produce(
                self.topic,
                msg,
                callback=lambda err, original_msg=msg: self.delivery_report(
                    err, original_msg
                ),
            )
            self.producer.flush()
    
    
    #SENDING DATA TO KAFKA TOPIC
    example_producer = ExampleProducer()
    message = "Hello this message will be sent to the kafka topic."
    example_producer.send_msg_async(message)

    The provided code outlines how to create a Kafka Producer class, configure it, and send messages to a specified Kafka topic. Depending on whether you want to send messages asynchronously or synchronously, you can choose the appropriate method to use.