Zero To One – Ready To Kickstart Your Software Engineering Career?

This course is designed for someone who has a basic understanding of coding. The course intends to provide flavours of little bit of everything. There are also links to leetcode problems that you should solve. Leetcode problems are selected in a way to provide you familiarity with different data structures and algorithms.

Please register for the course using the form here:

Lecture 1: How to approach a low level design.

Why system design? As to solve any industry problem in tech domain you should understand the system design.

Study Resources

Overview in slides

Lecture 2: Git

Git is a version control system. Heard of Linus Torvalds? He is the main developer for Linux. (notice the similarity of the name, Linus and Linux). Linus developed Git to help him in the development of Linux. Git has become so popular that it has become synonym for Version Control System or VCS in short.

Study Resources

Git for beginner

Git tutorial

Lecture 3: Database and JDBC

Now, that you know about programming. It’s time you know database a bit. SQL or Structured Query Language was initially developed by IBM. and JDBC which is used to connect sql in java.

Study Resources

Overview in slides.

Lecture 4: Web Server

HTTP server is software that understands URLs (web addresses) and HTTP (the protocol your browser uses to view webpages)

Study Resources

Overview in slides.

Lecture 5: Docker

It allows developers to build, package, and deploy applications and services as lightweight, portable containers.

Study Resources

Overview in slides.

Lecture 5: Cloud

The cloud is the Internet—more specifically, it’s all of the things you can access remotely over the Internet.

Study Resources

Overview in slides.

Leave a Reply

Design a TTL based in-memory cache in golang


Caches are data storage layer that are used to avoid doing operation that is expensive or hard to compute.

The following examples can be characterised as an expensive operation:

  • Data which is to be fetched from database, resulting in network calls and usage of precious database resources.
  • Data which is calculated using the response from multiple API calls to different services.
  • Cryptographic operations which take a lot of computing resources.

Essentially, caches are like a water jug kept on the dining table. They will be replenished from tap water when they are empty. Having a jug reduces the effort of multiple people going to get water.

Similarly, as a developer, we choose to store the data of expensive operation in cache and evict the data from cache depending on the specified eviction policy. The eviction policy could be of multiple kinds – LRU, LFU and time-based eviction.

We are going to talk about time-based eviction now. Simply put, we store the data for a specified duration and after this time is elapsed, we expire the data. We will take inspiration from the wonderful yet simple library – patrickmn/go-cache: An in-memory key:value store/cache (similar to Memcached) library for Go, suitable for single-machine applications. ( We will try to explain the code written here and try to make a simplified version of it.

Before we dive down to see the code, its good to see the cache in action. So, go ahead and use it in your code or just see the code in usage here – patrickmn/go-cache: An in-memory key:value store/cache (similar to Memcached) library for Go, suitable for single-machine applications. (


Caches are usually a combination of two components:

  • Storage layer to store the key-value pair.
  • Eviction layer to remove the data after the eviction condition is met. In our case, we are going to use time-based eviction.

Now, we will see how the library has implemented the two components. We will start with the data structures.


type Item struct {
	Object     interface{}
	Expiration int64

Item is used to store the value part of key-value pair. Item struct has two fields. Object which is an interface. Using an interface allows us to store any kind of value. Expiration is a field to store the Unix time after which the key-value pair should not be visible.


type janitor struct {
	Interval time.Duration
	stop     chan bool

Think of Janitor like a janitor in real life. Janitor struct has two components. Interval is time duration after which janitor periodically comes to clean up the storage. It purges the data which is expired. Stop is a channel which is used to inform janitor that cleaning is no longer needed.


type cache struct {
	defaultExpiration time.Duration
	items             map[string]Item
	mu                sync.RWMutex
	onEvicted         func(string, interface{})
	janitor           *janitor

Cache struct lies at the heart of this library. It stores the following fields:

  • defaultExpiration – Time after which the key-value (KV in short) pair will be expired if the key-value pair is not set with its own expiry time.
  • items is a map with string as key and Item as the value.
  • mu is a lock. Since map is not thread-safe, we cannot guarantee the behaviour of a map in case of concurrent write operation. Lock helps in ensuring that map is not accessed by two different threads for write at the same time. sync.RWMutex has two different kinds of lock – Lock() and RLock(). Lock() allows only one goroutine to read and write at a time. RLock() allows multiple goroutines to read but not write at the same time. Read more about it here – go – what is the difference between RLock() and Lock() in Golang? – Stack Overflow
  • onEvicted is a function that is passed by the user, which acts like a call back. This function is called whenever there is eviction of data. One use of the method could be to call a method that replenish the cache from database to prevent the data from getting stale.
  • janitor – janitor, as described above, is a cleaner to periodically purge data from the cache after a specified duration.


type Cache struct {

A field declared with a type but no explicit field name is an anonymous field, also called an embedded field or an embedding of the type in the struct. An embedded type must be specified as a type name T or as a pointer to a non-interface type name *T, and T itself may not be a pointer type. The unqualified type name acts as the field name.

But important question is – Why does this struct just have a cache pointer? We will try to find the answer later.

Now that we have seen the important structs, let’s have a look at the important methods.


func New(defaultExpiration, cleanupInterval time.Duration) *Cache {
	items := make(map[string]Item)
	return newCacheWithJanitor(defaultExpiration, cleanupInterval, items)

This is the method used by developers to instantiate a new cache. It has two parameters:

  • defaultExpiration – default ttl of the KV pair, if it is not set with its own ttl.
  • cleanupInterval – time interval after which janitor purges the expired data.

The method returns a pointer to the cache. This method calls newCacheWithJanitor method.

func newCacheWithJanitor(de time.Duration, ci time.Duration, m map[string]Item) *Cache {
	c := newCache(de, m)
	C := &Cache{c}
	if ci > 0 {
		runJanitor(c, ci)
		runtime.SetFinalizer(C, stopJanitor)
	return C

This method initialises c, a struct of type cache, and then initializes Cache. The method runJanitor runs a goroutine on c. Ticker is initialised with the purge duration and we have select waiting on a channel for the ticks to be delivered after purge duration. Once the ticks are delivered, DeletedExpired method is called. Tickers are used when you want to do something repeatedly at regular intervals. Tickers are built-in library in Golang.

func (j *janitor) Run(c *cache) {
	ticker := time.NewTicker(j.Interval)
	for {
		select {
		case <-ticker.C:
		case <-j.stop:

func runJanitor(c *cache, ci time.Duration) {
	j := &janitor{
		Interval: ci,
		stop:     make(chan bool),
	c.janitor = j
	go j.Run(c)

Since the janitor is working in a goroutine on c, an object of the cache struct, it will never be available for garbage collection. Hence, Cache struct is designed to have cache as a field. If Cache struct is garbage collected, stopJanitor is called using the runtime.setFinalizer. runtime.setFinalizer is used to call a function, here, stopJanitor as the first operand, and here C is garbage collected.


func (c *cache) Add(k string, x interface{}, d time.Duration) error {
	_, found := c.get(k)
	if found {
		return fmt.Errorf("Item %s already exists", k)
	c.set(k, x, d)
	return nil

func (c *cache) set(k string, x interface{}, d time.Duration) {
	var e int64
	if d == DefaultExpiration {
		d = c.defaultExpiration
	if d > 0 {
		e = time.Now().Add(d).UnixNano()
	c.items[k] = Item{
		Object:     x,
		Expiration: e,

Add method adds the key-value pair if it is not already stored or it is expired. allows only one goroutine to access the code block that it locks. This is important because items map in cache is not thread-safe. We are storing the expiry time in Unix time in nanoseconds.


func (c *cache) Get(k string) (interface{}, bool) {
	item, found := c.items[k]
	if !found {
		return nil, false
	if item.Expiration > 0 {
		if time.Now().UnixNano() > item.Expiration {
			return nil, false
	return item.Object, true

Get method is used by the user to get the value, given the key, if it is available in cache. Please notice that, Get metod uses RLock() as it allows multiple goroutines to access the code block being locked for read but not for write.

Key-Value pair is not removed immediately after the expiry time is over. DeleteExpired() method runs periodically after the purge interval, and it deletes the expired key-value pair. Until then, get method checks for the expiry by comparing current time with expiry time.

func (c *cache) DeleteExpired() {
	var evictedItems []keyAndValue
	now := time.Now().UnixNano()
	for k, v := range c.items {
		// "Inlining" of expired
		if v.Expiration > 0 && now > v.Expiration {
			ov, evicted := c.delete(k)
			if evicted {
				evictedItems = append(evictedItems, keyAndValue{k, ov})
	for _, v := range evictedItems {
		c.onEvicted(v.key, v.value)

Like the post? Please subscribe to the blog to get regular updates.

Implement your own cache with time based eviction

package main

import (

const (

func main() {
	fmt.Println("Hello World")
	cache := New(10*time.Hour, 20*time.Minute)
	cache.Set("foo", "bar", 2*time.Minute)
	value, found := cache.Get("foo")
	if found {
		fmt.Println("Value is ", value)

type Data struct {
	Value    interface{}
	ExpireAt int64

type Cleaner struct {
	Interval time.Duration
	stop     chan bool

type cache struct {
	defaultExpiryDuration time.Duration
	kvstore               map[string]Data
	locker                sync.RWMutex
	cleaner               *Cleaner
	onRemoval             func(string, interface{})

type Cache struct {

func New(defaultExpiryDuration time.Duration, cleanUpInterval time.Duration) *Cache {
	if defaultExpiryDuration == 0 {
		defaultExpiryDuration = INFINITY

	cache := &cache{
		defaultExpiryDuration: defaultExpiryDuration,
		kvstore:               make(map[string]Data),

	Cache := &Cache{cache}

	if cleanUpInterval > 0 {
		clean(cleanUpInterval, cache)
		runtime.SetFinalizer(Cache, stopCleaning)
	return Cache

func clean(cleanUpInterval time.Duration, cache *cache) {
	cleaner := &Cleaner{
		Interval: cleanUpInterval,
		stop:     make(chan bool),

	cache.cleaner = cleaner
	go cleaner.Cleaning(cache)


func (c *Cleaner) Cleaning(cache *cache) {
	ticker := time.NewTicker(c.Interval)

	for {
		select {
		case <-ticker.C:
		case <-c.stop:


func stopCleaning(cache *Cache) {
	cache.cleaner.stop <- true

func (cache *cache) purge() {
	now := time.Now().UnixNano()
	for key, data := range cache.kvstore {
		if data.ExpireAt < now {
			delete(cache.kvstore, key)

func (c *cache) Set(key string, value interface{}, expiryDuration time.Duration) {
	if expiryDuration == DEFAULT {
		expiryDuration = c.defaultExpiryDuration
	var expireAt int64

	if expiryDuration > 0 {
		expireAt = time.Now().Add(expiryDuration).UnixNano()
	c.kvstore[key] = Data{
		Value:    value,
		ExpireAt: expireAt,
func (c *cache) Get(key string) (interface{}, bool) {
	data, found := c.kvstore[key]
	if !found {
		return nil, false

	if data.ExpireAt < time.Now().UnixNano() {
		return nil, false

	return data.Value, true

Here’s a simplified version of TTL based cache. It would be recommended to design and implement it yourself first.

If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter:

Leave a Reply

Zero To One – Ready To Kickstart Your Software Engineering Career?

This course is designed for someone who has a basic to no understanding of coding. The course intends to provide flavours of little bit of everything. There are also links to leetcode problems that you should solve. Leetcode problems are selected in a way to provide you familiarity with different data structures and algorithms.

Please register for the course using the form here:

Lecture 1 – Basic Java

Why Java? Because Java is an easy Object Oriented Programming (OOP) language that is used in many companies across the globe.

Study Resources

The following link consists of three slides. These slides introduce the reader to Java and fundamental datatypes. Once you complete it, please make sure that you attempt the problem statements in the lab given below.

Java Lecture Slides


The questions here have to be solved on leetcode. Leetcode is the platform commonly used to improve the data structure and algorithm skills. The intention behind these problems is to solve

Problem 1

Problem 2

Problem 3

Lecture 2 – Git

Git is a version control system. Heard of Linus Torvalds? He is the main developer for Linux. (notice the similarity of the name, Linus and Linux). Linus developed Git to help him in the development of Linux. Git has become so popular that it has become synonym for Version Control System or VCS in short.

Study Resources

Git for beginner

Git tutorial


1. Create a github account

2. Push a sample text file in a repository created in your github account

Why github? What is github?

Github is one of the largest websites in terms of the hosted softwares. It allows users to host their code using git.

3. Problem 1

Lecture 3 – Java CRUD operation


Install Intellij


Hello World


Make a console-based application that support CRUD (Create, Read, Update, Delete) operations for the e-commerce domain.

The application should be able to support the following features:

  • Any user should be able to sign up, log in and log out.
  • Logged-in users should be able to browse products.
  • Logged-in user should have a shopping cart where the user should be able to add multiple products.
  • User should have the ability to checkout and total payable should be displayed while checkout.
  • User should have the following attributes: name, user id, address, date of birth.
  • The product should have the following attributes: name, product id, description, and price.
  • User and Product information should be persisted in-memory.
  • The console should have an option for all the operation mentioned above.
  • Push it to your git repository on github.

Lecture 4: SQL

Now, that you know about programming. It’s time you know database a bit. SQL or Structured Query Language was initially developed by IBM. It is used to program and manage data in RDBMS.

Study Resources

SQL tutorial


Install MySQL

Make a database for school management system.

Problem statement

Lecture 5: REST APIs


What is REST API?

Best Practices

What is maven?


Learn Maven

Create your first REST API

Push code in your github repo.

Problem statement

Lecture 6: Form Submission

Study Resources



Modify the project done in week 5 and complete the tutorial given here.

Push the new code to the existing repository.


Lecture 7 : JPA

Study Resources

What is JPA?


Modify the project done in week 5 and complete the tutorial given here.

Push the new code to the existing repository.


Lecture 8: AWS

You have completed a web application on your local computer. Its time to deploy this on cloud. AWS allows you to use some of the AWS services for free. We will use that.


Deploy the project of week 5 on AWS


Lecture 9: Load Balancer


What is nginx?


Use nginx in front of the application deployed in Lab 8 on AWS.


If you are stuck anywhere, feel free to comment.

Leave a Reply

Kafka – Everything that you should know before interview

Introduction to Apache Kafka Concepts

Free bold abstract painting background

Apache Kafka – Set up your first Kafka Producer and Consumer

Abstract art canvas

Kafka Internals: How does Kafka store the data?

Abstract smoke background

Reliable Data Delivery In Kafka

Free bold abstract painting background

Troubleshooting Under Replicated Kafka Partitions

Free bold abstract painting background

Kafka Broker Metrics And Their Debugging

Abstract liquid paint

Kafka Monitoring Using JMX, Prometheus And Grafana

Abstract wavy texture black background

What Is Stream Processing ?

Free bold abstract painting background

Like the post? Please subscribe to the blog to get regular updates.

Leave a Reply

What Is Stream Processing ?

What is a data stream or event stream?

A data stream is an abstraction representing an unbounded dataset. Unbounded that data is infinite and it grows over time as the new record keep getting added to the dataset. The data contained in the events or the number of events per second. The data differs from system to system—events can be tiny (sometimes only a few bytes) or very large (XML messages with many headers); they can also be completely unstructured, key-value pairs, semi-structured JSON, or structured Avro or Protobuf messages.

What are the example of data stream?

Every business transaction can be seen as stream of events. Think about the case when you do a payment through your favourite mobile wallet app. We can summarise the business transaction as following set of events:

  1. Open the app.
  2. Authenticate through your biometric details or enter the pass code.
  3. Scan the QR code or the wallet Id of the receiver.
  4. Enter the amount to be transferred.
  5. Enter the secure code.
  6. Get the payment confirmation screen.

Just like this, every other business transaction too can be modelled as the sequence of the events. Think of stock trades, package deliveries, network events going through a switch, events reported by sensors in manufacturing equipment, emails sent, moves in a
game – all of this is essentially stream of events.

What are the properties of data stream?

  1. Event streams are ordered – Events are ordered with respect to the time. We can say with confidence that the event X has occurred after the event Y. The business transaction where first event is 1000$ credit and second event is 1000$ debit to a bank account is different from the business transaction where debit occurs first and the credit happens next. The second business transaction involves overdraft charges where as the first business transaction is fairly normal one.
  2. Immutable data records – Events can never be modified after it has occurred. A cancelled financial transaction does not disappear. Rather, we have another event that does the cancellation against the previous transaction.
  3. Event streams are replayable – This is a desirable property. It is critical to be able to replay a raw stream of events that occurred months (and sometimes years) earlier for the majority of the business applications. This is required in order to correct errors, try new methods of analysis, or perform audits.


Stream processing fills the gap between the request-response world where we wait for events that take two milliseconds to process and the batch processing world where data is processed once a day and takes many hours to complete. Many business processes does not need either request response or batch processing. They may want something that continuously reads data from an unbounded dataset, doing something to it, and emitting output, which can be then presented as a report to the end user or stored in database as some business property. The processing has to be continuous and ongoing.

Stream-Processing Concepts

Stream processing is just like any other data processing where:

a. Get the data.

b. Do transformation on data.

c. Aggregate the data.

d. Store the data.

e. Present the data.

However, there are some key concepts which are useful for developing any stream application.


In the context of stream processing, having a common notion of time is critical because most stream applications perform operations on time windows. For example, our stream application might calculate a moving five-minute count of total order placed. In that case, we need to know what to do when one of our data servers goes offline for two hours due to any issues and returns with two hours worth of data—most of the data will be relevant for five-minute time windows that have long passed and for which the result was already calculated and stored.

Stream processing frameworks have following sense of time:

Event time

This is the time when the event happened. For example, when any user visits our website, that time is the event time for that event. Event time is usually the time that matters most when processing stream data.

Processing time

This is the time at which a stream-processing application received the event in order to perform some calculation. This time can be milliseconds, hours, or days after the event occurred. This notion of time is highly unreliable and best avoided.


Stream processing for a single event may be easy. But stream processing usually are more evolved than that. Stream processing usually contains following (but not limited to) operations:

  • Counting the number of events by type
  • Moving averages over 5 minutes window
  • Joining two streams to create an enriched stream of information
  • Aggregating data over hour
  • Sum, average, quantile over data

We call the information that is stored between events a state. State can be of following two kinds:

Internal State

State that is accessible only by a specific instance of the stream-processing application. This state is usually maintained and managed with an embedded, memory database running within the application. Embedded memory database allows it to be very fast. However, since it’s in-memory, we are limited by the amount of the data that it can store. As a consequence, sometimes stream processing is done by making several sub-stream of the data so that processing can be done using internal state.

External State

State that stores data in any external datastore like Cassandra are called external state. The advantage of this state is that we have unlimited memory and the data is accessible from anywhere. However, being external mean that we would have to bear external latency and added complexity of external system.

Stream-Table Duality

A database table allows checking the state of the data at a specific point in time. Unlike tables, streams contain a history of changes. Streams are a string of events wherein each event caused a change. A table contains a current state of the world, which is the result of many changes.

Let’s assume that we are tracking event of an ATM machine. Following events could happen:

  • Bank stores 10000$ in the ATM at 10:00 AM.
  • Person A withdraws 10K at 10:05 AM.
  • Person B withdraws 1K at 11:05 AM.
  • Person C withdraws 2K at 12:05 PM.
  • Person D withdraws 3K at 12:08 PM.
  • Person E withdraws 4K at 03:05 PM.

The database would tell us that at any point of time, what is the balance in the ATM. Stream would tell us how busy is that ATM. Which is the busiest hour? Stream and database are the two views to represent a business transaction.

Stream Processing Design Pattern

Single-Event Processing

Here, stream processing framework consumes a single message and do some data manipulation and then writes the output to any other stream. An example could be, a framework that checks fraud_probablity of each event and puts it into a stream that sends email to the end user.

Processing with Local State

Most stream-processing applications are concerned with aggregating information, especially time-window aggregation. An example could be to find the busiest hour of the website in order to scale the infrastructure. These aggregations require maintaining a state for the stream. As in our example, in order to calculate the number of website hits in an hour, we need to keep a counter to keep track of website hits in moving window of one hour. This could be done in a local state.

Assume we want to find hits on different pages of website, we can partition the streams based on different pages and then aggregate it using a local state.

However, local state should be accommodated in the memory and it should be persisted so that if the infrastructure crashes, we are able to recover the state.

Multiphase Processing

The multiphase processing incorporates following phases:

  1. Aggregate data using a local state
  2. Publish the data into a new stream
  3. Aggregate the data using the new stream mentioned in phase 2.

This type of multiphase processing is very familiar to those who write map-reduce code, where you often have to resort to multiple reduce phases.

Stream-Table Join

Sometimes stream processing requires integration with data external to the stream— validating transactions against a set of rules stored in a database, or enriching clickstream information with data about the users who clicked.

Now, making an external database call would mean not only extra latency, but also additional load on the database. The other constraint is that for the same sort of infrastructure, amount of events that can be processed by streaming platform is order of magnitude higher than what a database would process. So, this is clearly not a very scalable solution. Caching can be one strategy, but then caching the data means one need to manage cache infrastructure and manage data lifecycle. For example – how would you make sure that the data is not stale. One solution could be to ensure that the database changes are streamed and cache is updated based on the data in the stream.

Streaming Join

For example, let’s say that we have one stream with search queries that people entered into our website and another stream with clicks, which include clicks on search results. We want to match search queries with the results they clicked on so that we will know which result is most popular for which query. When you join two streams, you are joining the entire history, trying to match events in one stream with events in the other stream that have the same key and happened in the same time-windows. This is why a streaming-join is also called a windowed-join.

Joining two streams of events over a moving time window

Out-of-Sequence Events

Handling events that arrive at the stream at the wrong time is a challenge not just in stream processing but also in traditional ETL systems. For example, a mobile device of a Uber driver loses mobile signal for a few minutes and sends a few minutes worth of events when it reconnects.

In such scenario, the framework has to do following things:

a. Recognize that an event is out of sequence.

b. Define a time period during which it will attempt to reconcile. Outside the prescribed time period, the data will be considered useless.

c. Be able to update results which might mean updating a row in database.

Common Stream Processing Frameworks

Apache Storm, Apache Spark Streaming, Apache Flink, Apache Samza.

Kafka also provides with streaming APIs.


Kafka – The Definitive Guide

If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter:

Leave a Reply

Kafka Broker Metrics And Their Debugging

If you are new to Kafka, please read the first three posts of the series given below. Else dive in. 

Introduction to Kafka

Kafka Internals

Reliable Data Delivery in Kafka

Troubleshooting Under Replicated Kafka Partitions

If you are preparing for an interview, this post contains most of the things that you should know about Kafka.

Active Controller Count

Active controller is one of the brokers of Kafka cluster which is designated to do administrative tasks like reassigning partitions. The active controller count metric tells us id the broker is the controller for cluster or not. The value of this metric could be 0 or 1. This metric is emitted per broker.

What if two brokers say that they are the controller?

The active controller count metric indicates whether the broker is currently the controller for the cluster. The metric will either be 0 or 1, with 1 showing that the broker is currently the controller. Kafka cluster require one broker to be the controller and only one broker can be a controller at any given time.

What should you do when more than one broker claims to become controller?
This situation will affect the administrative tasks of cluster. The first step could be restart of the brokers claiming to be controller.

Metric Name


Request Handler Idle Ratio

Following are the two thread pools used by Kafka to handle requests:

Network Handlers

These are responsible for reading and writing data to the clients across the network. This does not require significant processing, so network handler don’t get exhausted easily.

Request Handlers

The request handler threads, however, are responsible for servicing the client request itself, which includes reading or writing the messages to disk. The request handler idle ratio metric indicates the percentage of time the request handlers are not in use. The lower this number, the more loaded the broker is. It is advisable to check the cluster for size or any other potential problem if the idle ratios goes lower than 20%.

Kafka uses purgatory to efficiently handle requests.
Read about purgatory here.

Metric Name


All Topics Bytes In

The all topics bytes in rate, expressed in bytes per second, is useful as a measurement of how much message traffic your brokers are receiving from producing clients. This is a good metric to trend over time to help you determine when you need to expand
the cluster or do other growth-related work. It is also useful for evaluating if one broker in a cluster is receiving more traffic than the others, which would indicate that it is necessary to rebalance the partitions in the cluster.

Metric Name


All Topics Bytes Out

The all topics bytes out rate, similar to the bytes in rate, is another overall growth metric. In this case, the bytes out rate shows the rate at which consumers are reading messages out. The outbound bytes rate may scale differently than the inbound bytes rate.

The outbound bytes rate also includes the replica traffic. This means that if all of the topics are configured with a replication factor of 2, we will see a bytes out rate equal to the bytes in rate when there are no consumer clients.

Metric Name


Other Important Kafka Broker Metrics

NameDescriptionMetrics Name
All topics messages inThe messages in rate shows the number of individual messages, regardless of
their size, produced per second. This is useful as a growth metric as a different measure of producer traffic.
Partition countThe partition count for a broker generally doesn’t change that much, as it is the total
number of partitions assigned to that broker. This includes every replica the broker has, regardless of whether it is a leader or follower for that partition.
Leader countThe leader count metric shows the number of partitions that the broker is currently the leader for. As with most other measurements in the brokers, this one should be generally even across the brokers in the cluster.kafka.server:
Offline partitionsThis measurement is only provided by the broker that is the controller for the cluster (all other brokers will report 0), and shows the number of partitions in the cluster that currently have no leader.kafka.controller:
Kafka Broker Metrics


Kafka – The Definitive Guide by Neha Narkhede, Gwen Shapira & Todd Palino

If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter:

Leave a Reply

Troubleshooting Under Replicated Kafka Partitions

There are two types of replica: Leader replica and Follower replica. Let’s say that there are three replicas of a partition. One of them, should be a leader. All the requests from producers and consumers would pass to the leader in order to guarantee consistency. All the replicas other than the leader are called follower. Follower do not serve any request and their only task is to keep themselves updated with the leader. One of the follower replicas become the leader in case the leader fails. The process where the data from the leader is fetched to the replicas is called replication. Please go through the article here, if you wish to read more about Kafka internals.

Replication is the core of Kafka.

If replication is not happening, in case of leader failure, we will not have a follower which can be made leader cleanly. It’s going to lead to inconsistency in data. So, how do we make sure that our partitions are not under replicated.

Fortunately, Kafka exposes JMX metric that can tell us if our replication is well behaving or not. We can expose these Kafka metrics to prometheus and set up an alert. This article will briefly discuss what are the potential reason for under replicated partitions and how to debug and fix it.

JMX Metric Name


This is the one must have metric that should be monitored. This metric provides us a count of the follower replicas that are not caught up with the leader replica which are present in a particular broker. This metric is provided for every broker in a cluster. This single measurement provides insight into a number of problems with the Kafka cluster, from a broker being down to resource exhaustion.

How to debug under replicated partitions?

Broker-level problems

If any broker is down, all the replicas on it will not be synced up with the leader and you will see a constant number of under replicated partitions.

If the number of under replicated partitions is not constant, or if all the brokers are up and running and still you see a constant count of under replicated partitions, this typically indicates a performance issue in the cluster. If the under-replicated partitions are on a single broker, then that broker is typically the problem. We can see the list of under replicated partition using the tool as shown in the picture below. We can now see the common broker as 2 in the list of the replicas that are under replicated. This means broker 2 is not working well.

Image taken from the book: Kafka – The Definitive Guide showing use of to find non-performing broker

Cluster-level problems

Unbalanced load

How do we define unbalanced load on a broker in a cluster? If any broker has dramatically more count of partition, or it has significantly more bytes going in or out with respect to other brokers in the cluster, then the load on that cluster can be considered as unbalanced. In order to diagnose this problem, We will need several metrics from the brokers in the cluster:

  • Leader partition count
  • All topics bytes in rate
  • All topics messages in rate
  • Partition count

Here’s an example shown below. The example shown below has a balanced cluster, as all the metrics are approximately same.

What if traffic is not balanced within the cluster and results in under replicated partitions?

We will need to move partitions from the heavily loaded brokers to the less heavily loaded brokers. This is done using the tool

Another common cluster performance issue is exceeding the capacity of the brokers to serve requests. There are many possible resources deficit that could slow things down. CPU, disk IO, and network throughput are some of those resources. Disk utilization is not one of them, as the brokers will operate properly right up until the disk is filled, and then this disk will fail abruptly.

How do we diagnose a capacity problem?

There are many metrics you can track at the OS level, including:

  • Inbound network throughput
  • Outbound network throughput
  • Disk average wait time
  • Disk percent utilization
  • CPU utilization

Underreplicated partitions can be result of exhausting any one of the resources written above. It’s important to know that the broker replication process operates in exactly the same way that other Kafka clients do. If our cluster is having problems with replication, then our clients must be having problems with producing and consuming messages as well. It makes sense to develop a baseline for these metrics when our cluster is operating correctly and then set thresholds that indicate a developing problem long before we run out of capacity. We should also review the trend for these metrics as the traffic to our cluster increases over time. As far as Kafka broker metrics are concerned, the All Topics Bytes In Rate is a good guideline to show cluster usage.

Application-level problem

We should also check if there is another application running on the system that is consuming resources and putting pressure on the Kafka broker. This could be something that was installed to debug a problem, or it could be a process that is supposed to be running, like a monitoring agent, but is having problems. We can use the tools on your system, such as top, to identify if there is a process that is using more CPU or memory than expected.

There could also be a configuration problem that might have crept in the broker or system configuration.

Hardware-level Problem

Hardware problems could be as obvious as a server that stops working or it could be less obvious and it starts causing performance problems. These are usually soft failures that allow the system to keep running but in degraded mode. This could be a bad bit of memory, where the system has detected the problem and bypassed that segment (reducing the overall available memory). The same can happen with a CPU failure.

For problems such as these, you should be using the facilities that our hardware provides – such as an intelligent platform management interface (IPMI) to monitor hardware health. When there’s an active problem, looking at the kernel ring buffer using dmesg will help you to see log messages that are getting thrown to the system console.

Disk failure
The more common type of hardware failure that leads to a performance degradation in Kafka is a disk failure. Apache Kafka is dependent on the disk for persistence of messages, and producer performance is directly tied to how fast our disks commit those writes. Any deviation in this will show up as problems with the performance of the producers and the replica fetchers. The latter is what leads to under-replicated partitions. As such, it is important to monitor the health of the disks at all times and address any problems quickly.

A single disk failure on a single broker can destroy the performance of an entire cluster. This is because the producer clients will connect to all brokers that lead partitions for a topic, and if you have followed best practices, those partitions will be evenly spread over the entire cluster. If one broker starts performing poorly and slowing down produce requests, this will cause back-pressure in the producers, slowing down requests to all brokers.

If you are new to Kafka, please read the first two posts of the series given below.

Introduction to Kafka

Kafka Internals

Setup Kafka Monitoring


Kafka: The Definitive Guide

If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter:

One thought on “Troubleshooting Under Replicated Kafka Partitions

Add yours

Leave a Reply

Tutorial For Kafka Monitoring Using JMX, Prometheus And Grafana

If you are new to Kafka, please read the first three posts of the series given below. Else dive in. 

Introduction to Kafka

Kafka Internals

Reliable Data Delivery in Kafka

How do your monitor your Kafka setup?

There are a number of measurement collected while Kafka is operational. We are going to collect these measurements in Prometheus and later make a dashboard in Grafana. All of the metrics exposed by Kafka can be accessed via the Java Management Extensions (JMX) interface. One way to use them in an external monitoring system is to use a collection agent provided by Prometheus and attach it to the Kafka process.

What is JMX?

JMX provides API to monitor and manage your resources at runtime.  It provides Java developers with the means to instrument Java code, create smart Java agents, implement distributed management middleware and managers, and easily integrate these solutions into existing management and monitoring systems.

Application, in our case, Kafka has to implement an interface called MBean of Java. You can see the implementation here. Kafka created the MBean and registered it. We need to now access the data by exposing it in some manner. We can get access through JConsole or through the communication adaptor or connectors available. In this tutorial, we are going to use JMX Prometheus exporter. There are other exporters too like Nagios XI check_jmx plugin, jmxtrans, Jolokia and MX4J.

Quickstart Kafka

If your kafka broker is not set up, please follow the step by step guide here.

JMX Prometheus Java Agent

Java agents are part of the Java Instrumentation API. The Instrumentation APIs provide a mechanism to modify bytecodes of methods. This can be done both statically and dynamically. This means that we can change a program by adding code to it without having to touch upon the actual source code of the program. The result can have a significant impact on the overall behavior of the application. JMX Prometheus exporter is a collector that can bed configured to scrape and expose mBeans of a JMX target. The collected data in Promethues can be later shown in Grafana. This exporter is intended to be run as a Java Agent, exposing a HTTP server and serving metrics of the local JVM.

Download the JMX Prometheus Java agent by the following command:

sudo wget -P /opt/kafka/prometheus/

Download the config for java agent:

cd /opt/kafka/prometheus/

The description of yml file can be found here. Basically, the conifguration is used to transform the metrics in the way prometheus understands.

Setting Kafka Options

Kafka_OPTS is an environment variable that you can set to pass custom settings to the Java Virtual Machine (JVM) that runs Kafka

export KAFKA_OPTS="-javaagent:/opt/kafka/prometheus/jmx_prometheus_javaagent-0.12.0.jar=7071:/opt/kafka/prometheus/kafka-2_0_0.yml"

Setting Heap Options

KAFKA_HEAP_OPTS is an environment variable that you can set to pass custom heap settings to the Java Virtual Machine (JVM) that runs Kafka

export KAFA_HEAP_OPTS="-Xmx1000M -Xms1000M"

Start your kafka

bin/ config/

Metrics exposed over http

You can check metrics at localhost:7071

Kafka Broker Metrics

Scrape Metrics data in Prometheus

Follow the instruction here to download Prometheus. Add localhost:7071 as the scrape target as shown below.

# my global config
  scrape_interval:     5s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 5s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.

  - job_name: 'kafka-monitoring'
    #metrics_path: '/actuator/prometheus'
      - targets: ['localhost:7071']

Run the prometheus using following command.

 ./prometheus --config.file=prometheus.yml

You should be able to see following screen on prometheus for any kafka metrics of your choice. I have randomly picked

kafka_server_kafkaserver_yammer_metrics_count as the metric.

Prometheus displaying Kafka Metrics

Display the metrics in Grafana

Download the grafana from here.

Connect the prometheus as the data source as shown below.

Download the dashboard json file from here.

Import the downloaded json in prometheus and you will see a functional dashboard.

Kafka Grafana Dashboard

If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter:

One thought on “Tutorial For Kafka Monitoring Using JMX, Prometheus And Grafana

Add yours

Leave a Reply

Reliable Data Delivery In Kafka

If you are new to Kafka, please read the first two posts of the series given below. Else dive in. 🙂

Introduction to Kafka

Kafka Internals

What are the guarantees provided by Kafka?

Guarantee of order

Assume that there are two messages – Message A and Message B. Message B is sent after Message A by a producer. These messages are written in the same partition of a Kafka topic. In this case, Kafka guarantees that the offset of message B is more than message A. This also means that a consumer who is consuming from this particular partition will always consume message B after it has consumed message A.

Guarantee of commit

Producer messages are considered committed only when the message is written to the partition in all the in-sync replicas. Producers can ask for acknowledgment at different levels – when the message is sent over the network, when it is committed or when it was written at the leader. Consumers can only read the message that is committed. Messages that are committed once will not be lost until there is at least one replica alive.

These basic guarantees can be used to design a reliable system. However, they alone are not sufficient. We live in a world of practicalities where infrastructure cost is limited (and therefore the number of replicas), traffic to the system is sometimes not predictable (and hence load on individual brokers), and so on and so forth. So, how can we make our system better in terms of reliability?

Reliability at the broker level

Replication Factor

You can create a topic with topic-level configuration called replication.factor. If you choose to not specify the replication factor at the topic level, the topics are created with the default configuration which is specified at the broker level called the default.replication.factor.

Replication of N means that you can afford to lose N-1 brokers without affecting the read and write of data. That means that you have fewer disasters but you will have N brokers and more infrastructure costs.

But what is the right number of replicas for your Kafka topic?

It really depends on how critical your application is. If you are okay with some data being lost when a broker is restarted, you may work with a replication factor of 1. Banking applications may need a high replica count so as to ensure high availability. One way to think about the replica factor is how costly the unavailability of your topic is. The topic which is processing click streams for sending promotional messages may be less critical than the topic which is processing banking transactions.

Unclean Leader Election

If the leader goes down and there are in-sync replicas available, one of the in-sync replicas will become the leader. But what happens when the leader goes down and there is no in-sync replica?

How can this happen though?

  • Let’s say there are three replicas. Assume two followers go down and then the producer continues writing to the leader. This makes the other two replicas out-of-sync. Now, if the leader goes down and one of the followers starts again. We will have an out-of-sync replica as the only available replica.
  • Assume that two followers out of three replicas lag in syncing with the leader. Meanwhile, the leader is accepting the requests of read and write. Now, if the leader goes down, two followers, out of which one could be the potential leader are essentially out of sync.

What options do we have now?

  • If we don’t allow these out-of-sync replicas to become the leader, we might have significant downtime which can hurt the business.
  • And if we allow one of these out-of-sync replicas to become the leader, we risk data loss and data inconsistencies. Imagine the two out-of-sync replicas have offset from 2000-3000 and the leader has reached the offset of 4000. Now, if the leader goes down and one of the out-of-sync replicas becomes leader, offset in the new leader will start from 3001. Consumers who have already read data till 4000, will not read the new data from offset 3001 to 4000 in the new leader. This will lead to data inconsistency.

The unclean election is the configuration where we allow out-of-sync replicas to become leader. This configuration is done by setting unclean.leader.election.enable flag to true. This flag is set to false where data quality and consistency is critical.

Minimum In-Sync Replicas

As per Kafka reliability guarantees, data is considered committed when it is written to all in-sync replicas, even when all
means just one replica and the data could be lost if that replica is unavailable. To make sure that committed data is written to more than one replica, the minimum number of in-sync replicas needed to certify that the message is indeed committed should be greater than 1. This can be done by a configuration called min.insync.replicas. This configuration is set up at both topic as well as the broker level.

What happens when producers try to send a message when there is not enough number of in-sync replicas?

Producers will get the NotEnoughReplicasException.

Reliability at the producer level

Send Acknowledgments

Producers can choose between three different acknowledgment modes:

  • acks=0 means that a message is considered to be written successfully to Kafka if the producer managed to send it over the network. We will get serialization error but we won’t get error if the Kafka cluster is down.
  • acks=1 means that the leader will send either an acknowledgment or an error the moment it got the message and wrote it to the partition data file. We can lose data if the leader crashes and some messages that were successfully written to the leader and acknowledged were not replicated to the followers before the crash.
  • acks=all means that the leader will wait until all in-sync replicas got the message before sending back an acknowledgment or an error

Configuring Producer Retries

There are two parts to handling errors in the producer:

  • The errors that the producers handle automatically. Errors like LEADER_NOT_AVAILABLE are retriable as leader might be available if retried.
  • The errors that you as the developer using the producer library must handle. Errors like INVALID_CONFIG have to be handled by developers as these have to be fixed first by the developers before it can be retried.

Retries and careful error handling can guarantee that each message will be stored at least once, but we can’t guarantee it will be stored exactly once. Applications make the messages idempotent—meaning that even if the same message is sent twice, it has no negative impact on correctness.

Reliability at the consumer level

To ensure data reliability, we need to take care of following configurations:

The basic idea is that if two consumers have the same group ID and subscribe to the same topic, each will be assigned a subset of the partitions in the topic and will therefore only read a subset of the messages individually but all the messages will be read by the group as a whole.


This can have two possible values – latest and earliest.

When the consumer has just started, it does not know what offset it has to ask for. In this scenario, if the value of this flag is set as earliest, then consumer will start consuming from the first message that the partition has. On the other hand, if the value of the flag is set as latest, then the consumer will start consuming from the current message that has been committed in the partition.

Offset has to be committed. Period.

The real question is when?

If we set the flag as true, consumer will automatically commit the offset after the time interval specified in flag. It’s all fine until the offset for the message which is not processed, but is read, is also committed. In general, committing more frequently adds some overhead but reduces the number of duplicates that can occur when a consumer stops. As a general rule, we should always commit offsets after events are processed.

Handling Long Processing Times

In some versions of the Kafka consumer, we can’t stop polling for more than a few seconds. Even if you don’t want to process additional records, we must continue polling so the client can send heartbeats to the broker. A common pattern in these cases is to hand off the data to a thread-pool when possible with multiple threads to speed things up a bit by processing in parallel. After handing off the records to the worker threads, you can pause the consumer and keep polling without actually fetching additional data until the worker threads finish. Once they are done, we can resume the consumer. Because the consumer never stops polling,
the heartbeat will be sent as planned and rebalancing will not be triggered.

Set up your first producer and consumer by following the tutorial here.

If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter:

One thought on “Reliable Data Delivery In Kafka

Add yours

Leave a Reply

Kafka Internals: How does Kafka store the data?

A quick recap of the basic terminology of Kafka can be found here.

The fundamental unit that Kafka manages is called a message. Message is simply a byte array without any restriction of the format. As you would know, producers produce data to a topic. Each topic is further divided into multiple partitions. Messages are sent from producer to Kafka in batches. A batch is a collection of messages that are produced to a particular topic and partition. This batching is done in order to avoid the round trip across the network for each individual message.

But why is the data in a topic stored in multiple partitions?

Imagine partition as a file. Each message is appended at the end of the file. Messages are read from the start to the end of the file. Partitions can be distributed across the brokers. Since messages are produced to different partitions of a topic, one can not expect the guarantee of time ordering across all messages. However, messages in one partition have a guarantee of time ordering.

Partitions are the way Kaka ensures scalability and redundancy. Each partition can be hosted on a different server, which means that a single topic can be scaled horizontally across multiple servers to provide performance far beyond the ability of a single server. This effectively means that when the count of messages are low, you can have your all partitions on one broker. On the other hand, if the message count and size increases, we can have just one partition on one server. One particular partition cannot be split between multiple brokers and not even between multiple disks on the same broker. Therefore, size of a partition is limited by the space available on a single mount point.

Replication is the core of Kafka. Each partition is replicated for redundancy and for the inevitable failure of hardware.

How does Kafka handle replication?

There are two types of replica: Leader replica and Follower replica. Let’s say there are three replicas of a partition. One of them, should be a leader. All the requests from producers and consumers would pass to the leader in order to guarantee consistency. All the replicas other than the leader are called follower. Follower do not serve any request and their only task is to keep themselves updated with the leader. One of the follower replicas become the leader in case the leader fails.

How does a follower stay in sync with the leader?

Follower sends a FETCH request to the leader. The request has the offset of the message that Kafka wants to receive next and will always be in the order. A replica will request message 1, then message 2, and then message 3, and it will not request message 4 before it gets the message 1, 2 and 3. Therefore the leader knows that a replica got all messages up to message 3 when the replica requests message 4. Leader knows how far behind each replica is by looking at the last offset requested by each replica. A replica is considered out of sync if the replica hasn’t requested a message in more than ten seconds. Any out of sync replica can not become leader.

What protocol does Kafka use for communication? Is it HTTP?

Kakfa does not uses HTTP. Kafka uses a binary protocol over TCP. The protocol defines all APIs as request and response message pairs. The client initiates a socket connection and then writes a sequence of request messages and reads back the corresponding response message. The client will likely need to maintain a connection to multiple brokers, as data is partitioned and the clients will need to talk to the server that has their data. 

The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well. The broker’s request processing allows only a single in-flight request per connection in order to guarantee this ordering. Note that clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput. i.e., clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer. All requests are initiated by the client, and result in a corresponding response message from the server.

The server has a configurable maximum limit on request size and any request that exceeds this limit will result in the socket being disconnected.

Example of FETCH Request

What do Kafka developers have to say about their choice of communication protocol?

Screenshot from Kafka documentation

Physical Storage

Let’s assume that we have the following scenario:

Number of broker: 3

One topic with 9 partition and replication factor of 2. That means, we have a total of 18 partitions.

Now, Kafka would try to assign 6 partition (18/3) on each broker so that no two replica are on same broker. If the rake information is available, it is also considered in the allocation of partition to a broker.

How does Kafka retain a message for the specified duration?

Each topic has a specified retention period or the size after which the messages are deleted, even if all the consumers have not read the message. Kafka splits each partition into segments. Each segment contains either 1 GB of data or a week of data, whichever is smaller, by default. As a Kafka broker is writing to a partition, if the segment limit is reached, we close the file and start a new one. The segment we are currently writing to is called an active segment. The active segment is never deleted, so if you set log retention to only store a day of data but each segment contains five days of data, you will really keep data for five days because we
can’t delete the data before the segment is closed. If you choose to store data for a week and roll a new segment every day, you will see that every day we will roll a new segment while deleting the oldest segment, so most of the time the partition will
have seven segments.

What is the file format of each segment?

Each segment is stored in a single data file. Inside the file, we store Kafka messages and their offsets. The format of the data on the disk is identical to the format of the messages that we send from the producer to the broker and later from the broker to the consumers. Using the same message format on disk and over the wire is what allows Kafka to use zero-copy optimization when sending messages to consumers and also avoid decompressing and recompressing messages that the producer already
compressed. Each message contains—in addition to its key, value, and offset—things like the message size, checksum code that allows us to detect corruption, magic byte that indicates the version of the message format, compression codec (Snappy, GZip, or LZ4), and a timestamp.

What are indexes in Kafka?

Kafka allows consumers to start fetching messages from any available offset. This means that if a consumer asks for 1 MB messages starting at offset 100, the broker must be able to quickly locate the message for offset 100 (which can be in any of the
segments for the partition) and start reading the messages from that offset on. In order to help brokers quickly locate the message for a given offset, Kafka maintains an index for each partition. The index maps offsets to segment files and positions within the file.

What is compaction in Kafka?

Kafka allows the retention policy on a topic to compact, which only stores the most recent value for each key in the topic. Obviously, setting the policy to compact only makes sense on topics for which applications produce events that contain both a key and a value. If the topic contains null keys, compaction will fail.

Set up your first producer and consumer by following the tutorial here.


Kafka: The Definitive Guide

Kafka Documentation: Protocol

If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter:

Leave a Reply

Up ↑