Getting Started with Kafka in Golang

Go and Apache Kafka official logo

This is my experience in these past two weeks while I get my hand dirty in Apache Kafka. Even I was introduced with Kafka by my CTO several months ago, but I still have some problems about how to produce and consume a data to Kafka and just have a spare time to take a look at it again. The harder part that I’ve encountered was when I try to setup Kafka cluster running using docker. While there are some open-source docker images out there, but they just give an example to running Kafka in single node.

In this post I will tell you briefly about what is Kafka, how to setup it using Docker, and create simple program which produce and consume message from Kafka.

The behind story of this post is about “how to make an API faster to save data, hence make the lower latency and more clients can be served.” Also, this post is intended to be used as an internal engineering show at Qiscus (yes, we at Qiscus periodically sets the internal sharing to what we’ve learned so far). So, let’s start with what is Apache Kafka itself.

What Is Apache Kafka?

Kafka Architecture. Source: http://kth.diva-portal.org/smash/get/diva2:813137/FULLTEXT01.pdf

Apache Kafka is an open-source stream processing software platform which started out at Linkedin. It’s written in Scala and Java. Kafka is based on commit log, which means Kafka stores a log of records and it will keep a track of what’s happening. This commit log is similar with common RDBMS uses.

To make it easier to imagine, it’s more like a queue where you always append a new data into the tail.

How Kafka stores a log, image source: https://www.confluent.io/blog/okay-store-data-apache-kafka/

While it seems simple, but in Kafka it can maintain the records into several partitions with same topic. Based on Kafka documentation’s, a topic is a category or feed name to which records are published. So, rather than just write into one queue like the image above, Kafka can writes into several queue with same topic name.

For each topic, the Kafka cluster maintains a partitioned log that looks like this. Source: https://kafka.apache.org/documentation/#design

Imagine a topic is a name of a hotel. And imagine that the partition topic is like that hotel have 3 lift elevators with same direction, let’s say: up direction. In a busy day, when a guest of a hotel want to go to upstairs, they will choose one of three elevators which have a few passengers on it. With same analogical logic, when a data arrives, we can tell Kafka to write into specific partition.

It’s all about how Kafka handle an incoming data. Now, let’s see how we can read or consume the data from Kafka.

In Kafka you can consume data from specific partition of a topic, or you can consume it from all partition. Interestingly, you can subscribe the data using several clients/workers and make each of it retrieve different data from different partition using consumer group. So, what is a consumer group?

Consumer group is like a label of a group of consumer. For each consumer under the same label, you will consume different messages. Hmm okay, in simpler way, it’s like when you are at school, your teacher wants you to make a group of 3 persons. Then, your teacher will give each group a label of name, for example group 1 given a name “Tiger”, while the other one “Apple”, and so on. From now on, you and your other 2 friends are recognized as one entity rather than 3 entity.

Kafka consumer group. Source: https://kafka.apache.org/documentation/#design

In Kafka, as the consumer already grouped into one label, it can consume different message, so the job of each person in the same group will be not redundant. Let’s say, I joined into group “Hore” with my two friends. There, I and my friends, will not snatch the same things to be processed. For example, if our group given assignment to sweep the classroom, we may split our job where I sweep the left side, one of my friend is in the middle, and another one is in the right side of the class. Just like the pupils in Japan while they wiped the classroom’s floor together:

Students wipe the floor. Source: http://lucky-japan.blogspot.com/2014/10/japanese-students-clean-classrooms-on.html

You can read more about the design behind of Kafka here:

Now, you have the idea about how the Kafka’s topic work. Apart from it’s internal design structure, now I will tell you about how to use Kafka’s publish (produce) and subscribe (consume) feature to create a worker.

To easy to follow, we can use case study where we need a system to retrieve a message through REST API then consume it and process it in workers. It can be sending email, push notification, save to database, or other time-taking process.

Setup Kafka With Docker

I will use Docker and Docker Compose to setup Kafka cluster in my local environment. Here’s my docker version that I use while writing this post:

Client:
Version: 18.06.0-ce
API version: 1.38
Go version: go1.10.3
Git commit: 0ffa825
Built: Wed Jul 18 19:11:02 2018
OS/Arch: linux/amd64
Experimental: false
Server:
Engine:
Version: 18.06.0-ce
API version: 1.38 (minimum version 1.12)
Go version: go1.10.3
Git commit: 0ffa825
Built: Wed Jul 18 19:09:05 2018
OS/Arch: linux/amd64
Experimental: false

and my docker-compose version

docker-compose version 1.21.2, build a133471
docker-py version: 3.3.0
CPython version: 3.6.5
OpenSSL version: OpenSSL 1.0.1t 3 May 2016

You may have newer version than me, but I think it’s okay since it can run docker compose yml file with version 2.

In this tutorial, I use docker image from Confluent: https://github.com/confluentinc/cp-docker-images.

Long story short, previously, I use wurstmeister/zookeeper-docker and wurstmeister/kafka-docker, but the Zookeeper is not support clustering mode, so, I change it to 31z4/zookeeper-docker. But, after trying it, I have a problem to publish message to some partition. Specifically, when I create a topic called “foo” with 4 partitions and 2 replication factors, I only got success publishing message into partition 1 and 3, while publishing message into partition 0 and 2 keeps return error like this:

Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.

Maybe it’s not the fault of the docker image, maybe it just my wrong configuration so that the Kafka doesn’t run as expected. Here’s my docker-compose.yml using wurstmeister/kafka-docker and 31z4/zookeeper-docker that have a problem (maybe you can check it and let me know if I have configured it wrong):

Luckily, I’ve found gist by Matt Howlett to run Kafka and Zookeeper in cluster mode using Confluent Platform. Here’s is the docker compose file that I use to test Kafka in this post:

Now, using that file, you can run it with MY_IP=your-ip docker-compose up. You can find your IP using ifconfig -a in your terminal.

After all service is up, you can create the topic, for example “foo” with this command:

docker run --net=host --rm confluentinc/cp-kafka:5.0.0 kafka-topics --create --topic foo --partitions 4 --replication-factor 2 --if-not-exists --zookeeper localhost:32181

It will create the topic “foo” with 4 partitions and replication factors = 2. If success, the above command will return something like this:

The left side is the docker logs while running Kafka and Zookeeper. The right side is the command to create new topic in Kafka.

Test using kafkacat

Now, you successfully running the Kafka and Zookeeper in cluster mode. You might wonder if it really functional before you want to write any code. For that matter, you can use command line tool named “kafkacat” which you can install using your beloved package manager like brew or apt.

First, open new terminal and type:

kafkacat -C -b localhost:19092,localhost:29092,localhost:39092 -t foo -p 0

It will listen to topic “foo” in partition 0 (Kafka start the partition index from 0).

Then, from the other terminal you can publish a message specific into partition 0 using this command:

echo 'publish to partition 0' | kafkacat -P -b localhost:19092,localhost:29092,localhost:39092 -t foo -p 0

If success, the first command will retrieve “publish to partition 0” message which sent by second command. You can do it respectively for partition 1, 2, and 3. You must ensure that all partition can receive the message as well as the consumer can subscribe it.

Testing all partition can read and write the message using kafkacat.

As you can see in the above figure that when I sent a message into specific partition, it must be retrieved only by the destination partition.

Writing Golang-based REST API

There you have it! A working Kafka cluster and the tool to ensure that it was running well. Now, you can start write the code to push a message into Kafka.

To connect into Kafka, I use segmentio/kafka-go since Shopify/sarama is not support consumer group. Also, even when I use bsm/sarama-cluster it stills have a problem when subscribing into Kafka version 2 as posted in this issue https://github.com/Shopify/sarama/issues/1144#issuecomment-412066477. So, finally I will use the segmentio/kafka-go library both for consumer and producer when writing this post after two weeks giving a try to both Shopify/sarama and bsm/sarama-cluster. (Yes, you read it right, I just move to segmentio/kafka-go today when I write this post 😅 ).

To serve and handle the request from HTTP, I will use gin-gonic/gin as a router. With this, you can create an end-point as easy as this snippet:

gin.SetMode(gin.ReleaseMode)                                                router := gin.New()
router.POST("/api/v1/data", postDataToKafka)
return router.Run(listenAddr)

The main part of publishing message into Kafka is you must create the connection into Kafka then you can produce a message to Kafka using established connection. You can find this code in this link https://github.com/yusufsyaifudin/go-kafka-example/blob/accadecfca65c956bf03a6199ff1f3944bc6d7dc/dep/kafka/config.go

Once the connection established and been set in writer variable, it can be use to push the message into Kafka with this code:

package kafkaimport (
"context"
"time"
"github.com/segmentio/kafka-go"
)
func Push(parent context.Context, key, value []byte) (err error) {
message := kafka.Message{
Key: key,
Value: value,
Time: time.Now(),
}
return writer.WriteMessages(parent, message)
}

Then you call it using:

err := kafka.Push(context.Background(), nil, []byte("your message content"))

Since this post will contains full of code if I explain it line by line, you can take a look the full code here:

However, I assume you have got the idea of publishing message into Kafka. It’s just like common API handling which accept the request, validate it and then save the data into database, but now it is in Kafka. Later, rather than finish the data life-cycle in Kafka, we need to create a worker to queue it to write into database following these reasons:

  • it is hard to query right from Kafka (even now you can do it using KSQL)
  • if you save the data in Kafka forever, once you create a new consumer-group, it will consume all data from beginning, which is too large (even it is okay to store data in Apache Kafka for forever)

Writing the Consumer as a Worker

Writing the consumer is much simpler. The main idea is just retrieve data from Kafka then process it. For each consumer group, we can do different works.

cmd/worker/main.go Source: https://github.com/yusufsyaifudin/go-kafka-example/blob/accadecfca65c956bf03a6199ff1f3944bc6d7dc/cmd/worker/main.go

If we look the code above, we can easily understand that the idea is straightforward:

  • We read configuration such as Kafka brokers URL, topic that this worker should listen to, consumer group ID and client ID from environment variable or program argument.
  • Then we make the connection to Kafka to subscribe particular topic in line 42–52. In line 52, you may notice that there is reader.Close() in deferred mode. It will tells the application to close the connection to Kafka if the program exit.
  • In line 55–73 we wrap the ReadMessage function in forever loop. This will make program listening to incoming message.
  • In line 63, we check the compression codec of incoming message. Since it publisher we use Snappy compression, we must decode it using Snappy too.
  • Line 72 is the final stage where we can really read the message content. Starting from this point, you can now process the data. For example, marshaling it into Go struct, write to persistent database, sent data to Elasticsearch to indexing or anything.

So, Why Using Kafka?

Maybe we still have a question, why do we using Kafka instead do any processing task right in the controller. Can we just write into database, cache and index service right after we receive the data? Or another question is, why don’t we use Redis pub/sub feature? Isn’t it fast enough since it processed in RAM?

I once think the same about it, and it depends on your use case. But, let’s consider this:

  • If you do any processing in the controller, it will cause the large latency to your client since for each request your client must waiting all the process is really done. You might think it can be tricked by using go routine or any spawning new thread mechanism. But, if this process is takes longer time, you will end-up having a system that always busy to handle your background thread process. Maybe it’s either kill your CPU or RAM. As you know, writing into database is already takes time, adding with time to writing to Elasticsearch, Redis, etc.
  • If you’re considering Redis pub/sub, maybe it’s okay if you only need to get notification and tolerate the message lost. It because Redis pub/sub mechanism is not queued, and even less persisted. They are only buffered in the socket buffers, and immediately sent to the subscribers in the same event loop iteration as the publication. If a subscriber fails to read a message, this message is lost for the subscriber. In Kafka, the message will retain for several days or months, depends on your retention log configuration. So, when a consumer (worker) is fail processing a particular data, it can be read from the beginning time as long as the data still on retention window time.
Kafka’s official website stated: “Kafka will perform the same whether you have 50 KB or 50 TB of persistent data on the server.” Source: https://kafka.apache.org/intro
  • Kafka’s performance is guaranteed to be the same under the small data sets or when it grow into terabyte of data. It because Kafka avoid trees (BTress or LSM Tree) on it’s internal structure. It uses queue mechanism, where data is appended to the system. To know more deeply why Kafka is so fast, here’s some link that you may be interested:

Yogyakarta, 11 August 2018. Written at the UGM library and continued at Jogja City Library.

Yogyakarta — Indonesia