Write Data to a Kafka Topic in Go programming language Example

  • Last updated Apr 25, 2024

Using Go with Apache Kafka is a common and effective combination for building scalable and real-time data streaming applications. Kafka is a distributed event streaming platform, and Go is a powerful language for building applications that can interact with Kafka clusters.

In this tutorial, you will learn how to write data to a Kafka topic in Go programming language. You can also learn how to read data from a Kafka topic in Go here.

To send data to a Kafka topic, we will use Confluent's Kafka Golang library. It provides a high level Producer and Consumer with support for balanced consumer groups of Apache Kafka 0.9 and above. An application that writes data to a Kafka topic is called a Producer Application.

Follow these steps to create a Go producer application for writing data to a Kafka topic using the Confluent Kafka Golang Library:

  1. Adding Dependencies:
  2. go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

    If you encounter the error 'C compiler 'gcc' not found: exec: 'gcc': executable file not found in %PATH%' when installing the confluent-kafka-go library on Windows, install TDM-GCC, which provides the required C compiler.

  3. Creating the Kafka Producer:
  4. package main
    
    import (
    	"fmt"
    
    	"encoding/json"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func produce(messages string, topicName string) {
    	fmt.Printf("Starting producer...")
    	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    	if err != nil {
    		panic(err)
    	}
    
    	defer p.Close()
    
    	// Delivery report handler for produced messages
    	go func() {
    		for e := range p.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
    				} else {
    					fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
    				}
    			}
    		}
    	}()
    
    	// Produce messages to topic (asynchronously)
    	topic := topicName
    	data := []byte(messages)
    	p.Produce(&kafka.Message{
    		TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    		Value:          data,
    	}, nil)
    
    	// Wait for message deliveries before shutting down
    	p.Flush(15 * 1000)
    }
    
    //Send data to Kafka topic
    type Data struct {
    	Id          int
    	Message     string
    	Source      string
    	Destination string
    }
    
    func main() {
    	topicName := "myTopic"
    
    	data := []Data{
    		{Id: 2, Message: "World", Source: "1", Destination: "A"},
    		{Id: 2, Message: "Earth", Source: "1", Destination: "B"},
    		{Id: 2, Message: "Planets", Source: "2", Destination: "C"},
    	}
    	stringJson, _ := json.Marshal(data)
    
    	fmt.Println(string(stringJson))
    
    	produce(string(stringJson), topicName)
    }

    The Producer application is ready. Now, run the Kafka services and create a topic named 'myTopic' on your local machine.