Last year, Jay Kreps wrote a great article titled It’s Okay to Store Data in Apache Kafka, in which he discusses a variety of ways to use Kafka as a persistent store. In one of the patterns, Jay describes how Kafka can be used to back an in-memory cache:
You may have an in-memory cache in each instance of your application that is fed by updates from Kafka. A very simple way of building this is to make the Kafka topic log compacted, and have the app simply start fresh at offset zero whenever it restarts to populate its cache.
After reading the above, you may be thinking, “That’s what I want to do. How do I do it?”
I will now describe exactly how.
Roughly, there are two approaches.
In the first approach, you would
- Create a compacted topic in Kafka.
- Create a Kafka consumer that will never commit its offsets, and will start by reading from the beginning of the topic.
- Have the consumer initially read all records and insert them, in order, into an in-memory cache.
- Have the consumer continue to poll the topic in a background thread, updating the cache with new records.
- To insert a new record into the in-memory cache, use a Kafka producer to send the record to the topic, and wait for the consumer to read the new record and update the cache.
- To remove a record from the in-memory cache, use a Kafka producer to send a record with the given key and a null value (such a record is called a tombstone), and wait for the consumer to read the tombstone and remove the corresponding record from the cache.
In the second approach, you would instead use KCache, a small library that neatly wraps all of the above functionality behind a simple
import io.kcache.*; String bootstrapServers = "localhost:9092"; Cache<String, String> cache = new KafkaCache<>( bootstrapServers, Serdes.String(), // for serializing/deserializing keys Serdes.String() // for serializing/deserializing values ); cache.init(); // creates topic, initializes cache, consumer, and producer cache.put("Kafka", "Rocks"); String value = cache.get("Kafka"); // returns "Rocks" cache.remove("Kafka"); cache.close(); // shuts down the cache, consumer, and producer
The choice is yours. 🙂