What’s Wrong with Kafka Partitioner?
Apache Kafka is a great event streaming platform with providing highly scalable infrastructure and offers several tools in its ecosystem. It enables parallel processing over messages, meaning that you can partition a topic into several parallel pipelines and process messages at scale.
The more parallelism you apply with partitions, the more confusion you get on the consumption order of events if it matters. Each event is represented by a pair of key-value. To have events in order, a key must be set to make Kafka send events to the same partition. If a key not provided, Kafka will share the events using round-robin among all partitions. But, Does this solve all problems?
How Kafka Event Key Works
It’s so simple but let me first quickly explain how I would expect this partition deciding process. Assuming that we have a topic with two partitions, we are sending events with the <key, value>
pairs respectively,
<202, event_1000>, <101, event_999>, ... <101, event_1> --> [Topic]
What I would expect from Kafka is that it would start keeping a lookup table of Key-Partition assignments during round-robin and after each incoming event, it fetches the partition value with respect to the event key. If the partition is null then it sends the event using round-robin then appends the current partition to the table along with the key. So there would be a lookup table as;
[101 -> 0, 202 -> 1]
and events would be distributed as;
Partition-0 -> [event_1, ..., event_999]
Partition-1 -> [event_1000]
But this would be more exhausting for Kafka. That’s why it uses a hashing algorithm called Murmur2 for partition decisions. It simply hashes the key and applies the modulus operator with partition count.
Finally, the remainder is the partition number of the event. So, in practice the distribution of events is as follows;
/-> Partition-0 <- [event_1, ..., event_999, event_1000]
Consumer --|
\-> Partition-1 <- []
Assume that there are numerous events with the key “101” which are waiting for being consumed and caused lags. Afterward, you send a couple of events with the key “202”.
In this scenario, Will new events having the key “202” have to wait for a long long time for old events while partition-1
is always empty? That's why we have to improve the justice of our Kafka, maybe using a gavel, or maybe Kafka Streams.
How to Balance The Load in Kafka Topic
You can implement a CustomPartitioner using the Partitioner interface in Producer API for your specific need. But seems that it is a bit different from our scenario and I’m not sure if there is a way of injecting a dynamic process into it like a lag checker.
To make our topics/partitions be more balanced, there are three tools that we can benefit from. Before sending the event, first, we query the topic with a key to find out to which partition we sent past events with the same key before. If our Stream Processing API returns a partition, then send the event to this partition with the key, otherwise query the Burrow, our Lag Checker, to specify the partition number which has the lowest lag.
- Kafka Streams API enables us to send interactive queries, directly to clusters.
- Burrow will guide us to decide which partition is more reasonable.
- Kafka Producer API enables us to send events to a specific partition even with a key.
1 Streams API, yet another part of the Apache Kafka ecosystem, is a Java-based library using an abstraction over topics. It simply works on the client-side without extra configurations on the clusters to process events streaming through Kafka. It has the same partitioning ability which also makes it highly scalable.
2Burrow is a monitoring service that reports information about topics in the Kafka clusters.
We only need it for obtaining partition lags for consumers and consumer groups in detail. Here, we will use Burrow to find out which partition is more reasonable to send events having different keys than previous ones. We can get current_lag
fields from partitions
list and find the best partition number having the lowest lag.
You can also implement an alert manager or auto-scaler service using it since it shows various metrics about the cluster.
3Finally, we need the Kafka Producer API of course. It gives us an option to set a partition number where the event will go through. We should not leave Kafka the partition decision process. Instead, we need to handle this process to achieve more balanced partitions in a topic. To help our Streams API successfully find the partition of the key, we always have to set a key to avoid nulls which are ignored by Kafka Streams API.
Some Terminologies About Kafka Streams
Let’s first mention some terms frequently used in Streams API, such as topology, state stores, interactive queries, materialized view.
Topology: Overall graph or bundle of processor logics registered and connected in streams.
State Store: Holds the key-value mappings produced by the topology in a stream, updates records as new events come, and allows to query local data. There is a need for the state if the streaming instance creates a dependency between events, otherwise not. States are stored in each streaming instance's local storage called RocksDB, you can set the storage to in memory though.
State stores can not be manipulated by stream clients, only topology processors are granted to make changes on them. For each store, Kafka keeps a replicated changelog of the topic. When an existing streaming app is down or a new one is registered with the same ID to scale out, Kafka Streams API starts rebalancing the partitions and local state stores using its changelog.
Interactive Queries: Makes the state-stores queryable. In general, the output of topology processors is directly pushed to another topic without a state. But interactive queries enable to query of the full state stores, even in local or remote stores. Kafka Streams API records the metadata of each event using KeyQueryMetadata which holds host information, partition number where events come from.
If the key is not found in the local state store, Kafka Streams API tries to find the remote host via the RPC layer and query the key on the remote state store. So, it ends up creating a network of local state-store as a full state-store table.
Materialized Views: Like a snapshot of the state-store tables. Once an event comes into a stream, processors in the topology will run and the materialized view refreshes the snapshot, not from the beginning but applying only the change. Operations in materialized views are incremental, not all over. To make a state-store queryable, it must be materialized.
Configurations: There are some important settings for a stream to work well.
How to Send Interactive Queries and Fetch Data
Kafka Streams API provides several functions to manipulate events. For our purpose, we only need key-value pairs of already sent events. That’s why creating a basic state-store is enough to query partition info of past events. This topology simply catches each event and saves them as a key-value pair. We simply tell Stream API what the topic name is and where to store states. In the example below, our key is String
and the event is MyEvent
. Since KTable
is like a form of a key-value map, it always stores the latest event with the same key.
By the way, for each processor, Kafka Streams API requires aSerde
which is a wrapper class combining the serializer and deserializer for the same data type. A Schema Registry would be better to prevent the streaming apps and consumers from crushing because it checks the structure of incoming events and rejects unexpected events on the producer side.
A stream has six states and we should query only when the stream is in Running or Rebalancing states. Simply, set a system property and block rest requests by checking this property in a web interceptor.
As stated before, we said that Kafka Streams can be run in distributed mode. To query state stores, first, we make sure that which instance hosts the key by checking the metadata, then apply RPC via an HTTP request to get the remote event.
That’s all. It is easy but remains one point that is worth mentioning. Streaming through all events may be dangerous for the application memory as it persists the output in local RocksDB. That’s why it is recommended to apply some Windowing operations. It has different types that you can apply according to business concerns and retention period settings.
Finally, our topics will end up with more balanced partitions using the basic logic below;