Write Data to a Kafka Topic using Confluent Kafka in Python
The Kafka Producer is responsible for sending messages to a Kafka topic.
This tutorial will guide you through the process of creating a Kafka Producer in Python using the Confluent Kafka library to write data to a Kafka topic.
You can also learn how to read data from a Kafka topic here.
Follow these steps to install Confluent Kafka library and create a producer application to send data to a Kafka topic in Python:
- Installing Confluent Kafka:
- Creating Producer:
Install the latest version of Python Confluent Kafka Library:
pip install confluent-kafka
Here's an example Python code of a Kafka Producer implemented using the confluent-kafka library:
from confluent_kafka import Producer
class ExampleProducer:
broker = "localhost:9092"
topic = "test-topic"
producer = None
def __init__(self):
self.producer = Producer({
'bootstrap.servers': self.broker,
'socket.timeout.ms': 100,
'api.version.request': 'false',
'broker.version.fallback': '0.9.0',
}
)
def delivery_report(self, err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(
msg.topic(), msg.partition()))
def send_msg_async(self, msg):
print("Send message asynchronously")
self.producer.produce(
self.topic,
msg,
callback=lambda err, original_msg=msg: self.delivery_report(err, original_msg
),
)
self.producer.flush()
def send_msg_sync(self, msg):
print("Send message synchronously")
self.producer.produce(
self.topic,
msg,
callback=lambda err, original_msg=msg: self.delivery_report(
err, original_msg
),
)
self.producer.flush()
#SENDING DATA TO KAFKA TOPIC
example_producer = ExampleProducer()
message = "Hello this message will be sent to the kafka topic."
example_producer.send_msg_async(message)
The provided code outlines how to create a Kafka Producer class, configure it, and send messages to a specified Kafka topic. Depending on whether you want to send messages asynchronously or synchronously, you can choose the appropriate method to use.