Reading Data from a Kafka Topic using Confluent Kafka in Python
In this tutorial, you will learn how to read data from a Kafka topic in Python.
To read data from a Kafka topic, we will use Confluent Kafka which is one of the best Python client libraries for Apache Kafka. It provides a high level Producer, Consumer, and AdminClient.
An application that reads data from a Kafka topic is called a Consumer application.
You can also find code to write data to a Kafka topic here.
Follow these steps to install Confluent Kafka library and write a consumer code to send data to a Kafka topic in Python:
Install Confluent Kafka
Install the Python Confluent Kafka Library:
pip install confluent-kafka
Consumer
Consumer code to read data from a Kafka topic:
from confluent_kafka import Consumer
from time import sleep
class ExampleConsumer:
broker = "localhost:9092"
topic = "test-topic"
group_id = "consumer-1"
def start_listener(self):
consumer_config = {
'bootstrap.servers': self.broker,
'group.id': self.group_id,
'auto.offset.reset': 'largest',
'enable.auto.commit': 'false',
'max.poll.interval.ms': '86400000'
}
consumer = Consumer(consumer_config)
consumer.subscribe([self.topic])
try:
while True:
print("Listening")
# read single message at a time
msg = consumer.poll(0)
if msg is None:
sleep(5)
continue
if msg.error():
print("Error reading message : {}".format(msg.error()))
continue
# You can parse message and save to data base here
print(msg)
consumer.commit()
except Exception as ex:
print("Kafka Exception : {}", ex)
finally:
print("closing consumer")
consumer.close()
#RUNNING CONSUMER FOR READING MESSAGE FROM THE KAFKA TOPIC
my_consumer = ExampleConsumer()
my_consumer.start_listener()
Kafka Consumer application is ready. Next, run the Kafka Services and create a topic test-topic on your local machine.
If you do not have Apache Kafka server already setup on your local machine then learn how to do it here for Windows and here for Mac/Ubuntu/Linux.