Sending Data to a Kafka Topic using kafka-python - Producer Example
In this tutorial, you will learn how to create a producer application to send data to a Kafka topic using kafka-python client library.
kafka-python is a Python client for the Apache Kafka. It is designed to work much like the official Java client.
kafka-python is recommended to use with newer versions (0.9+) of Kafka brokers. However, it is backwards compatible with previous versions (to 0.8.0). Some features such as a dynamic partition assignment to multiple consumer in the same group and rebalancing multiple consumers with partition assignments based on failures are only supported on newer version (0.9+) of Kafka brokers.
To learn how to read data from a Kafka topic, read here.
Follow these steps to complete this example:
Installing kafka-python
Install python-kafka Library:
pip install kafka-python
Creating the Kafka Producer
A producer application implements the KafkaProducer API to send data to a Kafka topic. KafkaProducer is an asynchronous, high-level message/data producer. KafkaProducer is thread-safe.
from kafka import KafkaProducer
import json
class MessageProducer:
broker = ""
topic = ""
producer = None
def __init__(self, broker, topic):
self.broker = broker
self.topic = topic
self.producer = KafkaProducer(bootstrap_servers=self.broker,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries = 3)
def send_msg(self, msg):
print("sending message...")
try:
future = self.producer.send(self.topic,msg)
self.producer.flush()
future.get(timeout=60)
print("message sent successfully...")
return {'status_code':200, 'error':None}
except Exception as ex:
return ex
broker = 'localhost:9092'
topic = 'test-topic'
message_producer = MessageProducer(broker,topic)
data = {'name':'abc', 'email':'[email protected]'}
resp = message_producer.send_msg(data)
print(resp)
Learn how to set up Apache Kafka server on your local machine here for Windows and here for Mac/Ubuntu/Linux.