Streaming Data Made Easy with Benthos

Yusuf Syaifudin
11 min readMay 6, 2022

--

Benthos logo

I sometimes heard about “How if we unified our pub/sub system, so developer is agnostic to Kafka?”, and this is my second time I heard of that. I sometimes wonder about why are we avoid interacting with Kafka so much while the concept is rather simple: publish (from your producers) and subscribe (on your consumers). But, when talking about creating consumer, the task seems repeatable and boring: we create consumer service, add some logic and etc. Then at some point we must add extra care because sometime we need to process the same data in different service (fan-out), or messages cannot be processed and we need to add Dead Letter Queue, or sometimes we need to do retry mechanism — and we’re doing this for every service that needs consume Kafka.

Maybe we can solve this by creating template, but another approach maybe we can using ETL tooling such as Benthos. In this article I will show you what and how we work with Benthos.

What is Benthos?

Benthos is a declarative data streaming service that solves a wide range of data engineering problems with simple, chained, stateless processing steps. It implements transaction based resiliency with back pressure, so when connecting to at-least-once sources and sinks it’s able to guarantee at-least-once delivery without needing to persist messages during transit.

Source: https://v4.benthos.dev/docs/about taken at May 6th, 2022

How we can use Benthos to stream our data?

The idea of “getting rid of Kafka” from developer’s view is usually convert the asynchronous data (in Kafka) processed into REST API interface system rather than creating standard template when building Kafka consumer’s services. In simple words, we actually create one middleware system that subscribe all message from list of Kafka topics, and then routing them into the destination service by calling REST API.

Because the nature of REST API is synchronous and we remove the ability to stop reading from Kafka if process has failure (stop send acknowledge to Kafka), we need to add some retry mechanism (back-off retry or publishing the message again so it can be processed later) and Dead Letter Queue if message still failed.

However, not all use-case can handle “delayed message”, some message like journal system or transaction system need to keep the order of incoming message. But, if the incoming message still failed after consecutive retry, maybe we can drop the data and send it to DLQ to manually reviewed by human.

So, here the use-case and requirements that I want to show you in this article:

  1. I will create the oversimplified wallet system where user can top-up and spend the money in specified walletID. The system will error if we spend money when we don’t have sufficient balance.
  2. Incoming data to the REST API must in the sequential order. The order is must exactly the same like the offset in Kafka because our source of truth is from Kafka topic.
  3. Apart from sending message to the Wallet Service, we need to send to another journal system to record each incoming message. This journaling system is good for analytic, auditing, or accounting. To minimize error, we save the journal data into file output.
  4. If incoming data is failed to be processed or system is down, we need to make a request retry.
  5. If consecutive retry still failed, we need to save it in the specific file as final DLQ.

By above requirement we know that in point 1 we need to implement distributed lock (will not explained because it is out of topic). Also, in point 2 and 3 it means that we need to do fan-out message delivery, meaning that sending data to Wallet Service via REST API and sending to journal system via writing to file can be done with concurrent processing as long as the order send to both system is the same. In point 4 and 5, we need to implement back-off retry and DLQ if data is failed to process.

To keep this article short, I will not share detailed process step by step as usual. Instead, I will share how to run and check if retry mechanism and DLQ is supported by Benthos.

First, run Kafka, redis and Wallet Service (written in Golang) using docker compose and commandgo run main.go wallet-server -p 8081 --redis-addr localhost:6379 --debug . You can use pre-built binaries downloaded from here https://github.com/yusufsyaifudin/benthos-sample/releases/tag/v1.0.0-alpha

Seed data into Kafka using:

$ cd golang
$ sudo chmod -R 757 wallet_seed_kafka.sh
$ ./wallet_seed_kafka.sh wallet_kafka.txt
Seed data into Kafka topic: walletservice

In the script wallet_seed_kafka.sh I create publish message to specific partition 0. This because if we use multiple partition in Kafka, we will get out-of-order message in the consumer which is not good for transaction system that need strong ordering.

Imagine we push data:

DATA 1 => GOES TO PARTITION 0
DATA 2 => GOES TO PARTITION 1
DATA 3 => GOES TO PARTITION 2
DATA 4 => GOES TO PARTITION 3
DATA 5 => GOES TO PARTITION 0
DATA 6 => GOES TO PARTITION 1
DATA 7 => GOES TO PARTITION 2
DATA 8 => GOES TO PARTITION 3

or represented like this:

Partition 0 => [1, 5]
Partition 1 => [2, 6]
Partition 2 => [3, 7]
Partition 3 => [4, 8]

When consumer using the consumer-group name subscribe this message, the message arrived may be out of order. The consumer may receive data with this order: [1, 3, 2, 4, 8, 7, 6, 5] because Kafka only maintain ordering for the same partition. To avoid this behavior, the data will publish specific to partition 0 in the Kafka (Kafka topic start from 0 index).

Then we can start the Benthos streams:

benthos -c simple/config/benthos.yaml -r "simple/resources/*.yaml" streams simple/streams/*.yaml

When we run this command it will do an action similar like the requirements I defined above:

  1. Consume message from Kafka and then trying to add Kafka metadata using processor proc_wallet_service_add_meta.
  2. If processor proc_wallet_service_add_meta is failed, for example due to non-valid JSON, then try to catch the error using errored check in the switch-case and processor out_file_wallet_service_proc_error.
  3. If processor proc_wallet_service_add_meta success then:
  4. Send all the message to the file (for Journaling log) using out_file_wallet_service and to the Wallet Service http://localhost:8081 using resource out_http_wallet_service. Writing to Journal Log file (out_file_wallet_service) should not return any error (in production we may send it into standard output and then pipe to another service such as Elasticsearch or database to minimize error). Sending to HTTP is prone to error, so we need to create `fallback` mechanism.
  5. If sending to HTTP service is failed after 3 consecutive retry, then send to DLQ out_file_wallet_service_error.
The processed message must be in the order.

You see that the final amount on wallet bis 49049.

  • The wallet dataset contain 1001 line of data, every 10 first data is top-up with amount: 10, 20, 30, 40, 50, 60, 70, 80, 90, 100. Then after that it will “spend” with amount of 11.
  • This mean that we have 91 repeated pattern (1001/11=91).
  • Also mean that the top-up amount is: (10 + 20 + 30 + 40 + 50 + 60 + 70 + 80 + 90 + 100) * 91 = 50050.
  • And spending amount is: (11 * 91) = 1001.
    The final value is`(50050–1001) = 49049. This 49049 amount must be the final value we get from the API.

To double-check, see from cURL:

curl -X POST ‘localhost:8081/amount’ -H ‘Content-Type: application/json’ — data-raw ‘{“walletID”:”b”}’ | jq .
Response after checking wallet b

Testing the errored data: must go to DLQ

Now we only have 1 file with name service_wallet_out.txt because we don’t have any error both in Wallet Service or in the message processor.

We can validate that all data/message consumed by Benthos from Kafka are passed to the Wallet Service. To verify we have the right order, use jq and bash script:

$ sudo chmod -R 757 golang/check_sorted.sh
$ ./golang/check_sorted.sh service_wallet_out.txt

It must not return anything, if error it will show message something like this:

kafkaOffset: prev offset 999 larger than current offset 100

Produce non-valid data

Now we try to produce data that is not valid JSON string. Following data is missing "after {in the key walletID:

echo ‘{walletID”:”b”, “operation”: “topup”, “amount”: “10”}’ | kcat -P -t walletservice -b localhost:9092 -p 0

We now will have new file service_wallet_proc_error.txt with content like this:

{“content”:”{walletID\”:\”b\”, \”operation\”: \”topup\”, \”amount\”: \”10\”}”,”error”:”failed assignment (line 1): unable to reference message as structured (with ‘this’): parse as json: invalid character ‘w’ looking for beginning of object key string”,”meta”:{“kafka_lag”:”0",”kafka_offset”:”1001",”kafka_partition”:”0",”kafka_timestamp_unix”:”1651819063",”kafka_topic”:”walletservice”},”reqID”:”28mQ1MGrZ7AWzGwil2BC7VJKIam”,”time”:”2022–05–06T13:37:44.298612+07:00"}
Benthos log when processor is failed

The file service_wallet_proc_error.txt act as DLQ when processor is error. In this case, the processor is error when adding Kafka metadata because it is not valid JSON string:

failed assignment (line 1): unable to reference message as structured (with ‘this’): parse as json: invalid character ‘w’ looking for beginning of object key string

The data will never be passed to Wallet Service because it stops when processor error.

Produce valid JSON data but cannot processed by Wallet Service

Now, we try to produce data that cannot be processed by Wallet Service.

echo ‘{“walletID”:”b”, “operation”: “topup”, “amount”: “10”}’ | kcat -P -t walletservice -b localhost:9092 -p 0

The payload may look valid, but actually is not. It because the Wallet Service expect amount to be number, but we send as string.

Now, see the Wallet Service’s logs. It shows that first call is error, then Benthos will retry every 3 seconds up-to 3 times. We can validate this by looking at the reqID value is still the same for all 4 requests.

When error, it will retry up-to maximum retry value we defined in Benthos configuration file. When all retry done but still failed, it goes to DLQ.
2022/05/06 13:47:09 {“path”:”/”,”request”:{“header”:{“Accept-Encoding”:”gzip”,”Content-Length”:”172",”Content-Type”:”application/json”,”User-Agent”:”Go-http-client/1.1"},”data_object”:{“amount”:”10",”kafkaOffset”:”1002",”kafkaPartition”:”0",”operation”:”topup”,”reqID”:”28mRAGUy6nWVpSrw41rFHy8vsNj”,”time”:”2022–05–06T13:47:09.758318+07:00",”walletID”:”b”}},”response”:{“header”:{“Content-Type”:”application/json”},”data_object”:{“error”:”failed read request body: json: cannot unmarshal string into Go struct field ReqData.amount of type int64"}}}
2022/05/06 13:47:12 {“path”:”/”,”request”:{“header”:{“Accept-Encoding”:”gzip”,”Content-Length”:”172",”Content-Type”:”application/json”,”User-Agent”:”Go-http-client/1.1"},”data_object”:{“amount”:”10",”kafkaOffset”:”1002",”kafkaPartition”:”0",”operation”:”topup”,”reqID”:”28mRAGUy6nWVpSrw41rFHy8vsNj”,”time”:”2022–05–06T13:47:09.758318+07:00",”walletID”:”b”}},”response”:{“header”:{“Content-Type”:”application/json”},”data_object”:{“error”:”failed read request body: json: cannot unmarshal string into Go struct field ReqData.amount of type int64"}}}
2022/05/06 13:47:15 {“path”:”/”,”request”:{“header”:{“Accept-Encoding”:”gzip”,”Content-Length”:”172",”Content-Type”:”application/json”,”User-Agent”:”Go-http-client/1.1"},”data_object”:{“amount”:”10",”kafkaOffset”:”1002",”kafkaPartition”:”0",”operation”:”topup”,”reqID”:”28mRAGUy6nWVpSrw41rFHy8vsNj”,”time”:”2022–05–06T13:47:09.758318+07:00",”walletID”:”b”}},”response”:{“header”:{“Content-Type”:”application/json”},”data_object”:{“error”:”failed read request body: json: cannot unmarshal string into Go struct field ReqData.amount of type int64"}}}
2022/05/06 13:47:18 {“path”:”/”,”request”:{“header”:{“Accept-Encoding”:”gzip”,”Content-Length”:”172",”Content-Type”:”application/json”,”User-Agent”:”Go-http-client/1.1"},”data_object”:{“amount”:”10",”kafkaOffset”:”1002",”kafkaPartition”:”0",”operation”:”topup”,”reqID”:”28mRAGUy6nWVpSrw41rFHy8vsNj”,”time”:”2022–05–06T13:47:09.758318+07:00",”walletID”:”b”}},”response”:{“header”:{“Content-Type”:”application/json”},”data_object”:{“error”:”failed read request body: json: cannot unmarshal string into Go struct field ReqData.amount of type int64"}}}

See the time value which incremented by 3 seconds, meaning that the next request will be made after 3 seconds if previous call is failed.

After 4 failed requests (1 for first request and 3 for retry), Benthos will save the failed request into file service_wallet_error.txt. The content of file service_wallet_error.txt will look like this:

{“amount”:”10",”kafkaOffset”:”1002",”kafkaPartition”:”0",”operation”:”topup”,”reqID”:”28mRAGUy6nWVpSrw41rFHy8vsNj”,”time”:”2022–05–06T13:47:09.758318+07:00",”walletID”:”b”}

Benthos will also save to file service_wallet_out.txt regardless all retries already done or not, this is used for journal log. The content of file service_wallet_out.txt will be appended with new data:

{“amount”:”10",”kafkaOffset”:”1002",”kafkaPartition”:”0",”operation”:”topup”,”reqID”:”28mRAGUy6nWVpSrw41rFHy8vsNj”,”time”:”2022–05–06T13:47:09.758318+07:00",”walletID”:”b”}

The kafkaOffset is 1002 because the offset 1001 is used by previous data when we try to publish non-valid JSON string.

Lesson learned

During trying Benthos, some thoughts come in my mind (please also share yours if you have one):

If your use-case is simple i.e: just consume data and then process it (like creating worker service), you can use Benthos. But, please beware if you have multiple topic and complex routing, for example you have 1 Kafka cluster for all topic and you need to routing each topic to different services. If each routing must have it’s own retry configuration, DLQ and so on, this will make your configuration complex and may resulting cognitive overhead to read and debug YAML file.

In my opinion, I think it is better to deploy specific Benthos configuration for specific use-case. If you think to define all resources in one place and then define each streams per node, you will get wrong routing.

When we define input in the resource (in my case is Kafka input),
and then use it multiple times (in the stream configuration) for each service, then the message routing is not work as expected. Sometimes message from topic A is still routed to the service B even though we already define usingswitch-check.

I mean, when we define this input resources:

# define input resources
input_resources:
- label: in_kafka_service
kafka:
addresses:
- localhost:9092
topics:
- calcservice
- walletservice
consumer_group: "benthos_consumer12"
checkpoint_limit: 1

Then create two file:

First, named service_calc.yaml :

# doing pipeline
input:
resource: in_kafka_service

output:
switch:
cases:
- check: meta("kafka_topic") == "calcservice"
output:
broker:
pattern: fan_out
outputs:
- resource: out_file_calc_service
- resource: out_http_calc_service
processors:
- resource: proc_calc_service_json

- check: errored()
output:
resource: out_file_calc_service_error

Second, named service_wallet.yaml :

# doing pipeline
input:
resource: in_kafka_service

output:
switch:
cases:
- check: meta("kafka_topic") == "walletservice"
output:
broker:
pattern: fan_out
outputs:
- resource: out_file_wallet_service
- fallback:
- resource: out_http_wallet_service
- resource: out_file_wallet_service_error #DLQ HTTP

- check: errored()
output:
resource: out_file_wallet_service_error #DLQ proc error
processors:
- resource: proc_wallet_service_add_meta

If we don’t know about Kafka consumer group, we may expect that each message will route to the Calc and Wallet service properly because we already define switch-check. But, it is not! Since we use consumer group, Benthos will consume all topic under the same consumer group. Then Benthos will try to process data to stream Calc even though the data come from topic walletservice. The data will stop there, it will not try to fan-out to all stream. This is the behavior that I got when I use Benthos version 4.0.0 (the same version that I use to write this article).

benthos -v
Version: 4.0.0
Date: 2022-04-21T08:35:06Z

Also, service_calc.yaml and service_wallet.yaml is not error nor really valid. When I tried to run this, the check: errored() never be called, but when I try to create simple streams I learn that check error must in the first check (or I need to add continue as documented here https://v4.benthos.dev/docs/components/outputs/switch/#casescontinue ?). Regardless what should I do, to use Benthos we must learn about the Benthos configuration. Again, if we only use small subset of Benthos feature, then it is okay. But if we’re going to write complex routing system just for only one type input (for example Kafka) and one output (i.e HTTP) and save all failed processed data into shared DLQ (with data partition using tag), I think it is better to reconsider.

Yogyakarta
Friday, May 6th, 2022 03:00PM

Ied Al-Fitr 1443 Hijr Mubarak!

--

--