Reading Data from a Kafka Topic using Confluent Kafka in Python

The Kafka Consumer is responsible for reading data from a Kafka topic.

This tutorial will guide you through the process of creating a Kafka Consumer in Python using the Confluent Kafka library to read data from a Kafka topic. Confluent Kafka which is one of the best Python client libraries for Apache Kafka. It provides a high level Producer, Consumer, and AdminClient.

An application that reads data from a Kafka topic is called a Consumer application.

You can also find code to write data to a Kafka topic here.

Follow these steps to install Confluent Kafka library and create a Consumer to read data from a Kafka topic in Python:

  1. Installing Confluent Kafka:
  2. Install the latest version of Python Confluent Kafka Library:

    pip install confluent-kafka
  3. Creating Consumer:
  4. Here's an example Python code of a Kafka Consumer implemented using the confluent-kafka library:

    from confluent_kafka import Consumer
    from time import sleep
    
    class ExampleConsumer:
        broker = "localhost:9092"
        topic = "test-topic"
        group_id = "consumer-1"
    
        def start_listener(self):
            consumer_config = {
                'bootstrap.servers': self.broker,
                'group.id': self.group_id,
                'auto.offset.reset': 'largest',
                'enable.auto.commit': 'false',
                'max.poll.interval.ms': '86400000'
            }
    
            consumer = Consumer(consumer_config)
            consumer.subscribe([self.topic])
    
            try:
                while True:
                    print("Listening")
                    # read single message at a time
                    msg = consumer.poll(0)
    
                    if msg is None:
                        sleep(5)
                        continue
                    if msg.error():
                        print("Error reading message : {}".format(msg.error()))
                        continue
                    # You can parse message and save to data base here
                    print(msg)
                    consumer.commit()
    
            except Exception as ex:
                print("Kafka Exception : {}", ex)
    
            finally:
                print("closing consumer")
                consumer.close()
    
    #RUNNING CONSUMER FOR READING MESSAGE FROM THE KAFKA TOPIC
    my_consumer = ExampleConsumer()
    my_consumer.start_listener()

    Here, broker specifies the Kafka broker's address, topic is the Kafka topic from which the consumer will read, and group_id represents the consumer group. The consumer class is created and is subscribed to the specified topic. A loop is added to continuously listen for messages. The poll method is used to retrieve messages. If no message is available, the execution sleeps for 5 seconds before checking again. The message content is printed, and consumer.commit() is used to mark the message as processed. It's important to note that you can customize the message processing logic to suit your specific use case.