Reading Data from a Kafka Topic using kafka-python - Consumer Example

In this tutorial, you will learn how to create a consumer application to read data from a Kafka topic using kafka-python client library.

kafka-python is a Python client for the Apache Kafka. It is designed to work much like the official Java client.

kafka-python is recommended to use with newer versions (0.9+) of Kafka brokers. However, it is backwards compatible with previous versions (to 0.8.0). Some features such as a dynamic partition assignment to multiple consumer in the same group and rebalancing multiple consumers with partition assignments based on failures are only supported on newer version (0.9+) of Kafka brokers.

To learn how to write data to a Kafka topic, read here.

Follow these steps to create a sample consumer application:

Installing kafka-python

Install kafka-python Library:


pip install kafka-python

Creating Consumer

A consumer application implements the KafkaConsumer API to read data from a Kafka topic. KafkaConsumer is a high-level message/data consumer. KafkaConsumer is not thread-safe. However, multiprocessing is recommended as we can use it in a thread-local manner.


from kafka import KafkaConsumer, consumer
from time import sleep
import json


class MessageConsumer:
    broker = ""
    topic = ""
    group_id = ""
    logger = None

    def __init__(self, broker, topic, group_id):
        self.broker = broker
        self.topic = topic
        self.group_id = group_id

    def activate_listener(self):
        consumer = KafkaConsumer(bootstrap_servers=self.broker,
                                 group_id='my-group',
                                 consumer_timeout_ms=60000,
                                 auto_offset_reset='earliest',
                                 enable_auto_commit=False,
                                 value_deserializer=lambda m: json.loads(m.decode('ascii')))

        consumer.subscribe(self.topic)
        print("consumer is listening....")
        try:
            for message in consumer:
                print("received message = ", message)
                
                #committing message manually after reading from the topic
                consumer.commit()
        except KeyboardInterrupt:
            print("Aborted by user...")
        finally:
            consumer.close()


#Running multiple consumers
broker = 'localhost:9092'
topic = 'test-topic'
group_id = 'consumer-1'

consumer1 = MessageConsumer(broker,topic,group_id)
consumer1.activate_listener()

consumer2 = MessageConsumer(broker,topic,group_id)
consumer2.activate_listener()

Learn how to set up Apache Kafka server on your local machine here for Windows and here for Mac/Ubuntu/Linux.