Use .buffer() with akka-stream-kafka

Posted by Boris Stumm on Monday, 4 December 2017

In our current project we use the akka-stream-kafka library in order to connect kafka with our application. When toying around with our setup, I found that there was no steady stream of messages coming in, and the overall throughput was not really good. It turned out that the Consumer.committableSource did not do buffering, but only polled when there were no more messages to process from the last poll. That might be a reasonable behavior, it just took me a while to realize it. To let the messages come in faster, I just added a .buffer() to my akka stream:

Consumer
.committableSource(settings, Subscriptions.topics("topic1"))
.buffer(1500, OverflowStrategy.backpressure)
As long as the buffer is not full, akka-streams will keep reading from the source and fill it, and while the source is polling, our application just keeps reading from the filled buffer.

For best results, you'll have to play around a bit with the size of the buffer and the value of ConsumerConfig.MAX_POLL_RECORDS_CONFIG.