Reading Data from a Kafka Topic using kafka-python - Consumer Example

In this tutorial, you will learn how to create a consumer application to read data from a Kafka topic using kafka-python client library.

kafka-python is a Python client for the Apache Kafka. It is designed to work much like the official Java client.

kafka-python is recommended to use with newer versions (0.9+) of Kafka brokers. However, it is backwards compatible with previous versions (to 0.8.0). Some features such as a dynamic partition assignment to multiple consumer in the same group and rebalancing multiple consumers with partition assignments based on failures are only supported on newer version (0.9+) of Kafka brokers.

To learn how to write data to a Kafka topic, read here.

Follow these steps to create a sample consumer application:

Installing kafka-python

Install kafka-python Library:

pip install kafka-python

Creating Consumer

A consumer application implements the KafkaConsumer API to read data from a Kafka topic. KafkaConsumer is a high-level message/data consumer. KafkaConsumer is not thread-safe. However, multiprocessing is recommended as we can use it in a thread-local manner.

from kafka import KafkaConsumer, consumer
from time import sleep
import json

class MessageConsumer:
    broker = ""
    topic = ""
    group_id = ""
    logger = None

    def __init__(self, broker, topic, group_id): = broker
        self.topic = topic
        self.group_id = group_id

    def activate_listener(self):
        consumer = KafkaConsumer(,
                                 value_deserializer=lambda m: json.loads(m.decode('ascii')))

        print("consumer is listening....")
            for message in consumer:
                print("received message = ", message)
                #committing message manually after reading from the topic
        except KeyboardInterrupt:
            print("Aborted by user...")

#Running multiple consumers
broker = 'localhost:9092'
topic = 'test-topic'
group_id = 'consumer-1'

consumer1 = MessageConsumer(broker,topic,group_id)

consumer2 = MessageConsumer(broker,topic,group_id)

Learn how to set up Apache Kafka server on your local machine here for Windows and here for Mac/Ubuntu/Linux.