Write Data to a Kafka Topic in Go programming language Example
Using Go with Apache Kafka is a common and effective combination for building scalable and real-time data streaming applications. Kafka is a distributed event streaming platform, and Go is a powerful language for building applications that can interact with Kafka clusters.
In this tutorial, you will learn how to write data to a Kafka topic in Go programming language. You can also learn how to read data from a Kafka topic in Go here.
To send data to a Kafka topic, we will use Confluent's Kafka Golang library. It provides a high level Producer and Consumer with support for balanced consumer groups of Apache Kafka 0.9 and above. An application that writes data to a Kafka topic is called a Producer Application.
Follow these steps to create a Go producer application for writing data to a Kafka topic using the Confluent Kafka Golang Library:
- Adding Dependencies:
- Creating the Kafka Producer:
go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
If you encounter the error 'C compiler 'gcc' not found: exec: 'gcc': executable file not found in %PATH%' when installing the confluent-kafka-go library on Windows, install TDM-GCC, which provides the required C compiler.
package main
import (
"fmt"
"encoding/json"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func produce(messages string, topicName string) {
fmt.Printf("Starting producer...")
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := topicName
data := []byte(messages)
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: data,
}, nil)
// Wait for message deliveries before shutting down
p.Flush(15 * 1000)
}
//Send data to Kafka topic
type Data struct {
Id int
Message string
Source string
Destination string
}
func main() {
topicName := "myTopic"
data := []Data{
{Id: 2, Message: "World", Source: "1", Destination: "A"},
{Id: 2, Message: "Earth", Source: "1", Destination: "B"},
{Id: 2, Message: "Planets", Source: "2", Destination: "C"},
}
stringJson, _ := json.Marshal(data)
fmt.Println(string(stringJson))
produce(string(stringJson), topicName)
}
The Producer application is ready. Now, run the Kafka services and create a topic named 'myTopic' on your local machine.