Reading Data from a Kafka Topic using Confluent Kafka in Python

In this tutorial, you will learn how to read data from a Kafka topic in Python.

To read data from a Kafka topic, we will use Confluent Kafka which is one of the best Python client libraries for Apache Kafka. It provides a high level Producer, Consumer, and AdminClient.

An application that reads data from a Kafka topic is called a Consumer application.

The steps below shows how to install the Confluent Kafka library and write code for a Kafka consumer application in Python:

Install Confluent Kafka

Install the Python Confluent Kafka Library:


pip install confluent-kafka

Consumer

Consumer code to read data from a Kafka topic:


from confluent_kafka import Consumer
from time import sleep

class ExampleConsumer:
    broker = "localhost:9092"
    topic = "test-topic"
    group_id = "consumer-1"

    def start_listener(self):
        consumer_config = {
            'bootstrap.servers': self.broker,
            'group.id': self.group_id,
            'auto.offset.reset': 'largest',
            'enable.auto.commit': 'false',
            'max.poll.interval.ms': '86400000'
        }

        consumer = Consumer(consumer_config)
        consumer.subscribe([self.topic])

        try:
            while True:
                print("Listening")
                # read single message at a time
                msg = consumer.poll(0)
                  
                if msg is None:
                    sleep(5)
                    continue
                if msg.error():
                    print("Error reading message : {}".format(msg.error()))
                    continue
                # You can parse message and save to data base here
                print(msg)
                consumer.commit()

        except Exception as ex:
            print("Kafka Exception : {}", ex)

        finally:
            print("closing consumer")
            consumer.close()

#RUNNING CONSUMER FOR READING MESSAGE FROM THE KAFKA TOPIC
my_consumer = ExampleConsumer()
my_consumer.start_listener()

Kafka Consumer application is ready. Next, run the Kafka Services and create a topic test-topic on your local machine.

If you do not have Kafka server already setup on your local machine then see how to do it here for Windows and here for Mac/Ubuntu/Linux.