Reading Data from a Kafka Topic using kafka-python - Consumer Example
In this tutorial, you will learn how to create a consumer application to read data from a Kafka topic using kafka-python client library.
kafka-python is a Python client for the Apache Kafka. It is designed to work much like the official Java client.
kafka-python is recommended to use with newer versions (0.9+) of Kafka brokers. However, it is backwards compatible with previous versions (to 0.8.0). Some features such as a dynamic partition assignment to multiple consumer in the same group and rebalancing multiple consumers with partition assignments based on failures are only supported on newer version (0.9+) of Kafka brokers.
To learn how to write data to a Kafka topic, read here.
Follow these steps to create a sample consumer application:
Installing kafka-python
Install kafka-python Library:
pip install kafka-python
Creating the Kafka Consumer
A consumer application implements the KafkaConsumer API to read data from a Kafka topic. KafkaConsumer is a high-level message/data consumer. KafkaConsumer is not thread-safe. However, multiprocessing is recommended as we can use it in a thread-local manner.
from kafka import KafkaConsumer, consumer
from time import sleep
import json
class MessageConsumer:
broker = ""
topic = ""
group_id = ""
logger = None
def __init__(self, broker, topic, group_id):
self.broker = broker
self.topic = topic
self.group_id = group_id
def activate_listener(self):
consumer = KafkaConsumer(bootstrap_servers=self.broker,
group_id='my-group',
consumer_timeout_ms=60000,
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('ascii')))
consumer.subscribe(self.topic)
print("consumer is listening....")
try:
for message in consumer:
print("received message = ", message)
#committing message manually after reading from the topic
consumer.commit()
except KeyboardInterrupt:
print("Aborted by user...")
finally:
consumer.close()
#Running multiple consumers
broker = 'localhost:9092'
topic = 'test-topic'
group_id = 'consumer-1'
consumer1 = MessageConsumer(broker,topic,group_id)
consumer1.activate_listener()
consumer2 = MessageConsumer(broker,topic,group_id)
consumer2.activate_listener()
Learn how to set up Apache Kafka server on your local machine here for Windows and here for Mac/Ubuntu/Linux.