Creating Distributed KV Database by Implementing Raft Consensus Using Golang

Yusuf Syaifudin
13 min readMay 30, 2020

--

Well, you may heard a lot about “distributed system” or “distributed database” itself. But, you may wonder how to create it or the internal behind it. In this post, I will try to explain how to create one of it using Raft consensus. This post more discussed about how to code and test the Raft cluster rather than discussing the internal Raft.

What is Raft Consensus?

When it comes to distributed system, we need a consensus to agree on some values. Since servers are located in different place it is hard to maintain the agreement. Imagine that you and your workmates want to make a discussion to solve a problem. When you all are in same meeting room, you can just wave your hand when you have an idea. But, when you WFH, there is some barrier that prevent you to speak, such as slow internet connection, your friends are speaking at the same time, etc.

To prevent this, you may create some rules, e.g: just one person who talk at one time, other person must mute their mic, etc. These set of rules is a consensus in order to achieve agreement.

In Raft Consensus, in order to broadcast the message to all node it must be sent by the Leader. The Leader will be elected by voting mechanism. You can search in Google how Raft consensus elect the leader. Article on Consul and Vault already tell some of common terms in Raft.

Let’s Implement Raft

I will use library from Hashicorp to create Raft cluster, and save it persistently in each node using BadgerDB. First, you may already heard about Finite State Machine (FSM) in Raft. It is the place where you can process the data in the local node.

First, you may notice (and must aware) that all operation in Raft Consensus is must be through the Leader. So, we need to ensure that all request must be sent to Leader node. Leader node then will send the command to all the follower, and wait to majority the server process the command. How many numbers of “majority” is symbolized by N number called “quorum”. Each follower then will do these:

  1. After receiving the command operation, data will be save in Write Ahead Log mode as a log entries.
  2. After successfully writing log entries, the data will be sent to FSM where you must process your data in a deterministic way.
  3. After you successfully process the data in FSM, it will return the data. Then the leader will be notified that this node already success. Once the Leader thinks that there is enough follower telling that they already successfully process the data, then the Leader will tell the client that data already “distributed” to N quorum.
Image source: https://www.sofastack.tech/en/projects/sofa-jraft/consistency-raft-jraft/

So, here is the flow diagram:

Image source: https://www.sofastack.tech/en/projects/sofa-jraft/consistency-raft-jraft/

Coding!

Okay, we already know the higher level of data flow in hashicorp/raft library and what the process behind it before client received “ok” by the leader server. Now, let me to tell you the implementation in code based on three step above one by one.

1. Create the server! In this example, we will create two server: HTTP server and RPC server. HTTP server is for us to interact with the each of node server, while RPC server is the built-in server from the hashicorp/raft library to communicate and exchange command between Leader to Follower(s).

Here is the snippet creating Raft:

raftServer, err := raft.NewRaft(raftConf, fsmStore, cacheStore, store, snapshotStore, transport)
if err != nil {
log.Fatal(err)
return
}

And here is the snippet creating HTTP server using Echo Framework:

e := echo.New()
e.HideBanner = true
e.HidePort = true
e.Pre(middleware.RemoveTrailingSlash())
e.GET("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux))

// Raft server
raftHandler := raft_handler.New(r)
e.POST("/raft/join", raftHandler.JoinRaftHandler)
e.POST("/raft/remove", raftHandler.RemoveRaftHandler)
e.GET("/raft/stats", raftHandler.StatsRaftHandler)

// Store server
storeHandler := store_handler.New(r, badgerDB)
e.POST("/store", storeHandler.Store)
e.GET("/store/:key", storeHandler.Get)
e.DELETE("/store/:key", storeHandler.Delete)

Let me briefly explain it, in order to create a cluster, Raft need to know each node. That’s why we create 3 endpoint using prefix /raft which consists of handler to add (join) and remove the node (Voter in Raft), and one to see each node statistics (or status).

To store, fetch and delete data, we also create 3 other endpoint: POST /store, GET /store/:key and DELETE /store/:key. I will show one of this endpoint later in step 4.

2. In step 2 above, I mentioned about log entries. It is place where hashicorp/raft save all log related to any cluster change: adding nodes, adding services, new key-value pairs, etc. It is a sequential entries, as already stated in Consul docs:

Log — The primary unit of work in a Raft system is a log entry. The problem of consistency can be decomposed into a replicated log. A log is an ordered sequence of entries. Entries includes any cluster change: adding nodes, adding services, new key-value pairs, etc. We consider the log consistent if all members agree on the entries and their order.
https://www.consul.io/docs/internals/consensus.html#raft-protocol-overview

3. Finite State Machine. In hashicorp/raft, you must implement raft.FSM interface to create Finite State Machine. It consists of 3 functions:
* Apply will be invoked when Raft already committed log entries in step 2.
* Snapshot is used to support log compaction. This can be used to save a point-in-time snapshot of the FSM.
* Restore is used to restore an FSM from a snapshot.

You may already noticed in step 1 when we creating new Raft instance, we pass the variable fsmStore. This actually just from calling this function fsm.NewBadger(badgerDB) with below code of implementation using BadgerDB:

In Apply method you will see that it only handle raft.LogCommand which has type CommandPayload. This is because in this library, it pass the []byte slices where in this program we passing the JSON string using CommandPayload structure (I will explain it in step 4).

You also see that it handle each command SET, GET, and DELETE using function b.set, b.get and b.delete. It actually just a wrapper function to save, fetch and delete data in BadgerDB. You can see the implementation in this link: https://github.com/yusufsyaifudin/raft-sample/blob/48254d0c71/fsm/badger.go#L18-L93

4. So, let’s take the POST /store function for example. As mentioned before, all raft operation should be done in Leader node, but in this program for GET /store/:key we will not use Leader node, instead we will use local BadgerDB to get the data. It may lead an inconsistent result, but this is intended for something that we call “eventual-consistency”.

In line 37–41, we checked that the operation must be done in Leader node, otherwise the handler will return error response. When the request is on Leader node, it will then build the payload using CommandPayload with operation SET. This will be encoded to JSON as []byte slices and passed to raft.Apply in line 56. If you still remember, in FSM code, we convert the []byte to CommandPayload, this is where those data are come from.

The raft.Apply then will broadcast the message to all raft nodes, and when is felt that enough Follower node saying “success” doing the Applyin their respective machine, the leader then return no error. Otherwise, the error will be returned in applyFuture.Error().

In previous FSM code, we always returned pointer of ApplyResponse, if we want get that value, we can also fetch it from applyFuture.Response and try convert it as *fsm.ApplyResponse.

This post maybe will long enough to explain all code line by line, but I think above explanation already cover the key point where you can start coding your own Raft cluster. For full code, you can refer to this Github repository (while writing this post I use the commit ID 45285b2 so you can do git checkout 45285b2 by yourself):

Testing

Testing in one critical part in software development, it ensures that our system already works as expected. Let’s do our manual testing for the software we just built. We will do these steps:

  • Run the program.
  • Join all nodes to create cluster.
  • Testing some store, get from two other Follower node and delete data.
  • Testing remove one Follower node and store some data. Rejoin recently removed node to the cluster, try fetch the data from it.
  • Shutdown one Follower node, store a data, Start again the terminated node, fetch the data from it.
  • Start new node (say, node4) and add the cluster then fetch data that already posted before from this node.

Done

Run the program

We will trying creating a raft cluster with 3 nodes: 1 Leader, 2 Follower. If you already learn in somewhere else (or again in Consul documentation), the 3 servers means that it need at least 2 nodes to say “okay” when Leader broadcast the raft.Apply before the Leader return “okay” too. Two nodes can be consists of the Leader itself and at least 1 other Follower in the cluster.

Why we create the 3 node is because we also want to check the raft consistency when node one node have a problem. When using 3 nodes, raft has failure tolerance 1.

To run the 3 nodes, run this command in different terminal tab:

$ SERVER_PORT=2221 RAFT_NODE_ID=node1 RAFT_PORT=1111 RAFT_VOL_DIR=node_1_data go run ysf/raftsample/cmd/api$ SERVER_PORT=2222 RAFT_NODE_ID=node2 RAFT_PORT=1112 RAFT_VOL_DIR=node_2_data go run ysf/raftsample/cmd/api$ SERVER_PORT=2223 RAFT_NODE_ID=node3 RAFT_PORT=1113 RAFT_VOL_DIR=node_3_data go run ysf/raftsample/cmd/api
Run the program in three different tab

Now, check each server, all three of them must be the Leader now:

http://localhost:2221/raft/stats is the Leader

Join all nodes to create cluster

Now, we manually select that http://localhost:2221/raft/stats is the Leader, and we want to join the other node as the Follower. In raft, we already mark each node with distinct node id:

  • Node ID: node1
    Raft RPC Port: 1111
    HTTP Port: 2221
  • Node ID: node2
    Raft RPC Port: 1112
    HTTP Port: 2222
  • Node ID: node3
    Raft RPC Port: 1113
    HTTP Port: 2223

To join node2 as Follower to node1 as the Leader, we can cURL by posting the node id and the Raft RPC address of the node2 to node1 using POST /raft/join.

curl -X POST 'http://localhost:2221/raft/join' \
-H 'Content-Type: application/json' \
--data-raw '{
"node_id":"node2",
"raft_address":"localhost:1112"
}'

So, we must join the node3 to node1 which currently elected as the Leader.

curl -X POST 'http://localhost:2221/raft/join' \
-H 'Content-Type: application/json' \
-H 'Content-Type: text/plain' \
--data-raw '{
"node_id":"node3",
"raft_address":"localhost:1113"
}'

Please note that in raft_address in JSON body, we only use localhost without http://. This is because the Raft address should be RPC call, not the HTTP.

Now, check again in http://localhost:2221/raft/stats you will see that now the latest_configuration is contained all three servers. At this point, raft cluster already set. We can try to store, fetch, and delete data now.

Raft Cluster created

Testing some store, get from two other Follower node and delete data

We already know that the Leader node is on node1 with HTTP address http://localhost:2221. To store new data, we can using this cURL:

curl -X POST 'http://localhost:2221/store' \
-H 'Content-Type: application/json' \
--data-raw '{
"key": "foo",
"value": "bar"
}'
Success persisting data

Then, try to fetch the data from 2 other Follower nodes:

curl -X GET 'http://localhost:2222/store/foo'
curl -X GET 'http://localhost:2223/store/foo'
Success fetching data from node2 and node3

It shows that 2 others node (node2 and node3) already received and persisted the data that we’ve sent in the Leader node (node1).

Now, try to remove the data (in this example using key foo), then retry above fetch cURL command. It must shows that the value in response is null.

curl -X DELETE 'http://localhost:2221/store/foo'
Delete data

When get the data, it shows that the key foo is not found in the storage.

Error fetching on non-exist or deleted data

Testing remove one Follower node and store some data. Rejoin recently removed node to the cluster, try fetch the data from it

Now, try to remove node3 from the cluster. Again, the operation of join or removing node must be done by the Leader, so request it to node1.

curl -X POST 'http://localhost:2221/raft/remove' \
-H 'Content-Type: application/json' \
-H 'Content-Type: text/plain' \
--data-raw '{
"node_id":"node3",
"raft_address":"localhost:1113"
}'
Removing node3 from the cluster

Check the status in http://localhost:2221/raft/stats now it shows that the cluster only consist of node1 and node2.

node3 already removed from latest_configuration

Now, post the data to the leader (node1) and try fetching it from node2 using key removednode :

curl -X POST 'http://localhost:2221/store' \
-H 'Content-Type: application/json' \
--data-raw '{
"key": "removednode",
"value": "my value"
}'
curl -X GET 'http://localhost:2222/store/removednode'
Post on node1 and fetch on node2 successfully synced the data

Now, join the node3 again to node1. And oops! leadership lost while commiting log !

Fail when trying rejoining the removed node (node3)

I don’t know what happen, but when I try it the second time it show success. Please comment below if you know what happen.

Rejoining node3 to the cluster success

Now, try fetching data from node3. Yeay! It already synced with the Leader node!

curl -X GET 'http://localhost:2223/store/removednode'
Fetching data from node3 after rejoining the cluster shows the synced data.

Shutdown one Follower node, store a data, Start again the terminated node, fetch the data from it.

Same as the previous test case, but now instead of explicitly removing the node3 from the cluster, we try to shutting it down.

Let’s shutting the node3 down!

node3 is shutting down

Once you terminatted the node3, node1 as the leader will notified and show the error in console log which look like this:

2020-05-29T17:17:55.816+0700 [DEBUG] raft: failed to contact: server-id=node3 time=1m37.425816508s

Then, store to node1 and fetch from node2.

curl -X POST 'http://localhost:2221/store' \
-H 'Content-Type: application/json' \
--data-raw '{
"key": "problemnode",
"value": "hello world"
}'
curl -X GET 'http://localhost:2222/store/problemnode'
Success persisting data even with one failure node.

Now, start again the node3 with same configuration (volume data, raft rpc port, http port), so it will use last state before it terminated.

$ SERVER_PORT=2223 RAFT_NODE_ID=node3 RAFT_PORT=1113 RAFT_VOL_DIR=node_3_data go run ysf/raftsample/cmd/api

After started again, the node1 will notified that node3 already up again.

2020-05-29T17:20:58.043+0700 [ERROR] raft: peer has newer term, stopping replication: peer="{Voter node3 localhost:1113}"
2020-05-29T17:20:58.043+0700 [INFO] raft: entering follower state: follower="Node at :1111 [Follower]" leader=
2020-05-29T17:20:58.043+0700 [INFO] raft: aborting pipeline replication: peer="{Voter node2 localhost:1112}"
2020-05-29T17:20:58.174+0700 [DEBUG] raft: lost leadership because received a requestVote with a newer term
2020-05-29T17:20:58.216+0700 [WARN] raft: rejecting vote request since our last index is greater: candidate=:1113 last-index=12 last-candidate-index=11
2020-05-29T17:20:59.659+0700 [WARN] raft: heartbeat timeout reached, starting election: last-leader=
2020-05-29T17:20:59.659+0700 [INFO] raft: entering candidate state: node="Node at :1111 [Candidate]" term=265
2020-05-29T17:20:59.720+0700 [DEBUG] raft: votes: needed=2
2020-05-29T17:20:59.720+0700 [DEBUG] raft: vote granted: from=node1 term=265 tally=1
2020-05-29T17:20:59.832+0700 [DEBUG] raft: vote granted: from=node2 term=265 tally=2
2020-05-29T17:20:59.833+0700 [INFO] raft: election won: tally=2
2020-05-29T17:20:59.833+0700 [INFO] raft: entering leader state: leader="Node at :1111 [Leader]"
2020-05-29T17:20:59.833+0700 [INFO] raft: added peer, starting replication: peer=node2
2020-05-29T17:20:59.833+0700 [INFO] raft: added peer, starting replication: peer=node3
2020-05-29T17:20:59.833+0700 [INFO] raft: pipelining replication: peer="{Voter node2 localhost:1112}"
2020-05-29T17:20:59.843+0700 [WARN] raft: appendEntries rejected, sending older logs: peer="{Voter node3 localhost:1113}" next=12
2020-05-29T17:20:59.881+0700 [INFO] raft: pipelining replication: peer="{Voter node3 localhost:1113}"

Then, try to fetch the data from node3:

curl -X GET 'http://localhost:2223/store/problemnode'
node3 also have the latest data after it up again

As expected, because we already try using remove-rejoin node where the data still synced up while node lost it’s contact, in this scenario we also see that even after it’s failure the Follower node can keep up sync with the leader node.

Start new node (say, node4) and add the cluster then fetch data that already posted before from this node.

Start the new node,

SERVER_PORT=2224 RAFT_NODE_ID=node4 RAFT_PORT=1114 RAFT_VOL_DIR=node_4_data go run ysf/raftsample/cmd/api
Starting new node4

Then join it to the cluster and fetch the data from it:

curl -X POST 'http://localhost:2221/raft/join' \
-H 'Content-Type: application/json' \
-H 'Content-Type: text/plain' \
--data-raw '{
"node_id":"node4",
"raft_address":"localhost:1114"
}'
curl -X GET 'http://localhost:2224/store/problemnode

You will see that our previously stored data using key problemnode with value hello world can be fetched from this node4.

Join node4 to node1 and fetch data with key `problemnode` that already stored before from node4.

Done is done!

Conclusion

Creating Distributed System now easier than before because there are many libary providing Raft Consensus such as hashicorp/raft or dragonboat. Although Raft is not the only one consensus that we can use, but it is the alternative consensus that easier to understand compared to Paxos. As you already follow this post, you will feel that implementing Raft (using hashicorp/raft) is as easy as you only to implement raft.FSM using your own logic. Raft consistently replicate the data to the newly joined node, so creating the distributed system on top of this consensus generally safe.

28–30 May 2020
Yogyakarta, Indonesia

--

--

Yusuf Syaifudin
Yusuf Syaifudin