Reading Data from a Kafka Topic using kafka-python - Consumer Example
In this tutorial, you will learn how to create a consumer application to read data from a Kafka topic using kafka-python client library.
kafka-python is a Python client for the Apache Kafka. It is designed to work much like the official Java client.
kafka-python is recommended to use with newer versions (0.9+) of Kafka brokers. However, it is backwards compatible with previous versions (to 0.8.0). Some features such as a dynamic partition assignment to multiple consumer in the same group and rebalancing multiple consumers with partition assignments based on failures are only supported on newer version (0.9+) of Kafka brokers.
To learn how to write data to a Kafka topic, read here.
Follow these steps to create a sample consumer application:
Install kafka-python Library:
pip install kafka-python
Creating the Kafka Consumer
A consumer application implements the KafkaConsumer API to read data from a Kafka topic. KafkaConsumer is a high-level message/data consumer. KafkaConsumer is not thread-safe. However, multiprocessing is recommended as we can use it in a thread-local manner.
from kafka import KafkaConsumer, consumer from time import sleep import json class MessageConsumer: broker = "" topic = "" group_id = "" logger = None def __init__(self, broker, topic, group_id): self.broker = broker self.topic = topic self.group_id = group_id def activate_listener(self): consumer = KafkaConsumer(bootstrap_servers=self.broker, group_id='my-group', consumer_timeout_ms=60000, auto_offset_reset='earliest', enable_auto_commit=False, value_deserializer=lambda m: json.loads(m.decode('ascii'))) consumer.subscribe(self.topic) print("consumer is listening....") try: for message in consumer: print("received message = ", message) #committing message manually after reading from the topic consumer.commit() except KeyboardInterrupt: print("Aborted by user...") finally: consumer.close() #Running multiple consumers broker = 'localhost:9092' topic = 'test-topic' group_id = 'consumer-1' consumer1 = MessageConsumer(broker,topic,group_id) consumer1.activate_listener() consumer2 = MessageConsumer(broker,topic,group_id) consumer2.activate_listener()