Zero To One – Ready To Kickstart Your Software Engineering Career?
This course is designed for someone who has a basic understanding of coding. The course intends to provide flavours of little bit of everything. There are also links to leetcode problems that you should solve. Leetcode problems are selected in a way to provide you familiarity with different data structures and algorithms.
Please register for the course using the form here:
Lecture 1: How to approach a low level design.
Why system design? As to solve any industry problem in tech domain you should understand the system design.
Study Resources
Overview in slides
Lecture 2: Git
Git is a version control system. Heard of Linus Torvalds? He is the main developer for Linux. (notice the similarity of the name, Linus and Linux). Linus developed Git to help him in the development of Linux. Git has become so popular that it has become synonym for Version Control System or VCS in short.
Now, that you know about programming. It’s time you know database a bit. SQL or Structured Query Language was initially developed by IBM. and JDBC which is used to connect sql in java.
Study Resources
Overview in slides.
Lecture 4: Web Server
HTTP server is software that understands URLs (web addresses) and HTTP (the protocol your browser uses to view webpages)
Study Resources
Overview in slides.
Lecture 5: Docker
It allows developers to build, package, and deploy applications and services as lightweight, portable containers.
Study Resources
Overview in slides.
Lecture 5: Cloud
The cloud is the Internet—more specifically, it’s all of the things you can access remotely over the Internet.
Caches are data storage layer that are used to avoid doing operation that is expensive or hard to compute.
The following examples can be characterised as an expensive operation:
Data which is to be fetched from database, resulting in network calls and usage of precious database resources.
Data which is calculated using the response from multiple API calls to different services.
Cryptographic operations which take a lot of computing resources.
Essentially, caches are like a water jug kept on the dining table. They will be replenished from tap water when they are empty. Having a jug reduces the effort of multiple people going to get water.
Similarly, as a developer, we choose to store the data of expensive operation in cache and evict the data from cache depending on the specified eviction policy. The eviction policy could be of multiple kinds – LRU, LFU and time-based eviction.
Caches are usually a combination of two components:
Storage layer to store the key-value pair.
Eviction layer to remove the data after the eviction condition is met. In our case, we are going to use time-based eviction.
Now, we will see how the library has implemented the two components. We will start with the data structures.
Item
type Item struct {
Object interface{}
Expiration int64
}
Item is used to store the value part of key-value pair. Item struct has two fields. Object which is an interface. Using an interface allows us to store any kind of value. Expiration is a field to store the Unix time after which the key-value pair should not be visible.
Janitor
type janitor struct {
Interval time.Duration
stop chan bool
}
Think of Janitor like a janitor in real life. Janitor struct has two components. Interval is time duration after which janitor periodically comes to clean up the storage. It purges the data which is expired. Stop is a channel which is used to inform janitor that cleaning is no longer needed.
cache
type cache struct {
defaultExpiration time.Duration
items map[string]Item
mu sync.RWMutex
onEvicted func(string, interface{})
janitor *janitor
}
Cache struct lies at the heart of this library. It stores the following fields:
defaultExpiration – Time after which the key-value (KV in short) pair will be expired if the key-value pair is not set with its own expiry time.
items is a map with string as key and Item as the value.
mu is a lock. Since map is not thread-safe, we cannot guarantee the behaviour of a map in case of concurrent write operation. Lock helps in ensuring that map is not accessed by two different threads for write at the same time. sync.RWMutex has two different kinds of lock – Lock() and RLock(). Lock() allows only one goroutine to read and write at a time. RLock() allows multiple goroutines to read but not write at the same time. Read more about it here – go – what is the difference between RLock() and Lock() in Golang? – Stack Overflow
onEvicted is a function that is passed by the user, which acts like a call back. This function is called whenever there is eviction of data. One use of the method could be to call a method that replenish the cache from database to prevent the data from getting stale.
janitor – janitor, as described above, is a cleaner to periodically purge data from the cache after a specified duration.
Cache
type Cache struct {
*cache
}
A field declared with a type but no explicit field name is an anonymous field, also called an embedded field or an embedding of the type in the struct. An embedded type must be specified as a type name T or as a pointer to a non-interface type name *T, and T itself may not be a pointer type. The unqualified type name acts as the field name.
But important question is – Why does this struct just have a cache pointer? We will try to find the answer later.
Now that we have seen the important structs, let’s have a look at the important methods.
This is the method used by developers to instantiate a new cache. It has two parameters:
defaultExpiration – default ttl of the KV pair, if it is not set with its own ttl.
cleanupInterval – time interval after which janitor purges the expired data.
The method returns a pointer to the cache. This method calls newCacheWithJanitor method.
func newCacheWithJanitor(de time.Duration, ci time.Duration, m map[string]Item) *Cache {
c := newCache(de, m)
C := &Cache{c}
if ci > 0 {
runJanitor(c, ci)
runtime.SetFinalizer(C, stopJanitor)
}
return C
}
This method initialises c, a struct of type cache, and then initializes Cache. The method runJanitor runs a goroutine on c. Ticker is initialised with the purge duration and we have select waiting on a channel for the ticks to be delivered after purge duration. Once the ticks are delivered, DeletedExpired method is called. Tickers are used when you want to do something repeatedly at regular intervals. Tickers are built-in library in Golang.
func (j *janitor) Run(c *cache) {
ticker := time.NewTicker(j.Interval)
for {
select {
case <-ticker.C:
c.DeleteExpired()
case <-j.stop:
ticker.Stop()
return
}
}
}
func runJanitor(c *cache, ci time.Duration) {
j := &janitor{
Interval: ci,
stop: make(chan bool),
}
c.janitor = j
go j.Run(c)
}
Since the janitor is working in a goroutine on c, an object of the cache struct, it will never be available for garbage collection. Hence, Cache struct is designed to have cache as a field. If Cache struct is garbage collected, stopJanitor is called using the runtime.setFinalizer. runtime.setFinalizer is used to call a function, here, stopJanitor as the first operand, and here C is garbage collected.
Add(…)
func (c *cache) Add(k string, x interface{}, d time.Duration) error {
c.mu.Lock()
_, found := c.get(k)
if found {
c.mu.Unlock()
return fmt.Errorf("Item %s already exists", k)
}
c.set(k, x, d)
c.mu.Unlock()
return nil
}
func (c *cache) set(k string, x interface{}, d time.Duration) {
var e int64
if d == DefaultExpiration {
d = c.defaultExpiration
}
if d > 0 {
e = time.Now().Add(d).UnixNano()
}
c.items[k] = Item{
Object: x,
Expiration: e,
}
}
Add method adds the key-value pair if it is not already stored or it is expired. c.mu.Lock() allows only one goroutine to access the code block that it locks. This is important because items map in cache is not thread-safe. We are storing the expiry time in Unix time in nanoseconds.
Get method is used by the user to get the value, given the key, if it is available in cache. Please notice that, Get metod uses RLock() as it allows multiple goroutines to access the code block being locked for read but not for write.
Key-Value pair is not removed immediately after the expiry time is over. DeleteExpired() method runs periodically after the purge interval, and it deletes the expired key-value pair. Until then, get method checks for the expiry by comparing current time with expiry time.
func (c *cache) DeleteExpired() {
var evictedItems []keyAndValue
now := time.Now().UnixNano()
c.mu.Lock()
for k, v := range c.items {
// "Inlining" of expired
if v.Expiration > 0 && now > v.Expiration {
ov, evicted := c.delete(k)
if evicted {
evictedItems = append(evictedItems, keyAndValue{k, ov})
}
}
}
c.mu.Unlock()
for _, v := range evictedItems {
c.onEvicted(v.key, v.value)
}
}
Like the post? Please subscribe to the blog to get regular updates.
Implement your own cache with time based eviction
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
const (
INFINITY = -1
DEFAULT = 0
)
func main() {
fmt.Println("Hello World")
cache := New(10*time.Hour, 20*time.Minute)
fmt.Println(cache.defaultExpiryDuration)
fmt.Println(cache.kvstore)
cache.Set("foo", "bar", 2*time.Minute)
fmt.Println(cache.kvstore)
value, found := cache.Get("foo")
if found {
fmt.Println("Value is ", value)
}
}
type Data struct {
Value interface{}
ExpireAt int64
}
type Cleaner struct {
Interval time.Duration
stop chan bool
}
type cache struct {
defaultExpiryDuration time.Duration
kvstore map[string]Data
locker sync.RWMutex
cleaner *Cleaner
onRemoval func(string, interface{})
}
type Cache struct {
*cache
}
func New(defaultExpiryDuration time.Duration, cleanUpInterval time.Duration) *Cache {
if defaultExpiryDuration == 0 {
defaultExpiryDuration = INFINITY
}
cache := &cache{
defaultExpiryDuration: defaultExpiryDuration,
kvstore: make(map[string]Data),
}
Cache := &Cache{cache}
if cleanUpInterval > 0 {
clean(cleanUpInterval, cache)
runtime.SetFinalizer(Cache, stopCleaning)
}
return Cache
}
func clean(cleanUpInterval time.Duration, cache *cache) {
cleaner := &Cleaner{
Interval: cleanUpInterval,
stop: make(chan bool),
}
cache.cleaner = cleaner
go cleaner.Cleaning(cache)
}
func (c *Cleaner) Cleaning(cache *cache) {
ticker := time.NewTicker(c.Interval)
for {
select {
case <-ticker.C:
cache.purge()
case <-c.stop:
ticker.Stop()
}
}
}
func stopCleaning(cache *Cache) {
cache.cleaner.stop <- true
}
func (cache *cache) purge() {
now := time.Now().UnixNano()
for key, data := range cache.kvstore {
if data.ExpireAt < now {
delete(cache.kvstore, key)
}
}
}
func (c *cache) Set(key string, value interface{}, expiryDuration time.Duration) {
if expiryDuration == DEFAULT {
expiryDuration = c.defaultExpiryDuration
}
var expireAt int64
if expiryDuration > 0 {
expireAt = time.Now().Add(expiryDuration).UnixNano()
}
c.locker.Lock()
defer c.locker.Unlock()
c.kvstore[key] = Data{
Value: value,
ExpireAt: expireAt,
}
}
func (c *cache) Get(key string) (interface{}, bool) {
c.locker.RLock()
defer c.locker.RUnlock()
data, found := c.kvstore[key]
if !found {
return nil, false
}
if data.ExpireAt < time.Now().UnixNano() {
return nil, false
}
return data.Value, true
}
If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter: https://skillcaptain.substack.com
Zero To One – Ready To Kickstart Your Software Engineering Career?
This course is designed for someone who has a basic to no understanding of coding. The course intends to provide flavours of little bit of everything. There are also links to leetcode problems that you should solve. Leetcode problems are selected in a way to provide you familiarity with different data structures and algorithms.
Please register for the course using the form here:
Lecture 1 – Basic Java
Why Java? Because Java is an easy Object Oriented Programming (OOP) language that is used in many companies across the globe.
Study Resources
The following link consists of three slides. These slides introduce the reader to Java and fundamental datatypes. Once you complete it, please make sure that you attempt the problem statements in the lab given below.
The questions here have to be solved on leetcode. Leetcode is the platform commonly used to improve the data structure and algorithm skills. The intention behind these problems is to solve
Git is a version control system. Heard of Linus Torvalds? He is the main developer for Linux. (notice the similarity of the name, Linus and Linux). Linus developed Git to help him in the development of Linux. Git has become so popular that it has become synonym for Version Control System or VCS in short.
Make a console-based application that support CRUD (Create, Read, Update, Delete) operations for the e-commerce domain.
The application should be able to support the following features:
Any user should be able to sign up, log in and log out.
Logged-in users should be able to browse products.
Logged-in user should have a shopping cart where the user should be able to add multiple products.
User should have the ability to checkout and total payable should be displayed while checkout.
User should have the following attributes: name, user id, address, date of birth.
The product should have the following attributes: name, product id, description, and price.
User and Product information should be persisted in-memory.
The console should have an option for all the operation mentioned above.
Push it to your git repository on github.
Lecture 4: SQL
Now, that you know about programming. It’s time you know database a bit. SQL or Structured Query Language was initially developed by IBM. It is used to program and manage data in RDBMS.
You have completed a web application on your local computer. Its time to deploy this on cloud. AWS allows you to use some of the AWS services for free. We will use that.
A data stream is an abstraction representing an unbounded dataset. Unbounded that data is infinite and it grows over time as the new record keep getting added to the dataset. The data contained in the events or the number of events per second. The data differs from system to system—events can be tiny (sometimes only a few bytes) or very large (XML messages with many headers); they can also be completely unstructured, key-value pairs, semi-structured JSON, or structured Avro or Protobuf messages.
What are the example of data stream?
Every business transaction can be seen as stream of events. Think about the case when you do a payment through your favourite mobile wallet app. We can summarise the business transaction as following set of events:
Open the app.
Authenticate through your biometric details or enter the pass code.
Scan the QR code or the wallet Id of the receiver.
Enter the amount to be transferred.
Enter the secure code.
Get the payment confirmation screen.
Just like this, every other business transaction too can be modelled as the sequence of the events. Think of stock trades, package deliveries, network events going through a switch, events reported by sensors in manufacturing equipment, emails sent, moves in a game – all of this is essentially stream of events.
What are the properties of data stream?
Event streams are ordered – Events are ordered with respect to the time. We can say with confidence that the event X has occurred after the event Y. The business transaction where first event is 1000$ credit and second event is 1000$ debit to a bank account is different from the business transaction where debit occurs first and the credit happens next. The second business transaction involves overdraft charges where as the first business transaction is fairly normal one.
Immutable data records – Events can never be modified after it has occurred. A cancelled financial transaction does not disappear. Rather, we have another event that does the cancellation against the previous transaction.
Event streams are replayable – This is a desirable property. It is critical to be able to replay a raw stream of events that occurred months (and sometimes years) earlier for the majority of the business applications. This is required in order to correct errors, try new methods of analysis, or perform audits.
Stream-Processing
Stream processing fills the gap between the request-response world where we wait for events that take two milliseconds to process and the batch processing world where data is processed once a day and takes many hours to complete. Many business processes does not need either request response or batch processing. They may want something that continuously reads data from an unbounded dataset, doing something to it, and emitting output, which can be then presented as a report to the end user or stored in database as some business property. The processing has to be continuous and ongoing.
Stream-Processing Concepts
Stream processing is just like any other data processing where:
a. Get the data.
b. Do transformation on data.
c. Aggregate the data.
d. Store the data.
e. Present the data.
However, there are some key concepts which are useful for developing any stream application.
Time
In the context of stream processing, having a common notion of time is critical because most stream applications perform operations on time windows. For example, our stream application might calculate a moving five-minute count of total order placed. In that case, we need to know what to do when one of our data servers goes offline for two hours due to any issues and returns with two hours worth of data—most of the data will be relevant for five-minute time windows that have long passed and for which the result was already calculated and stored.
Stream processing frameworks have following sense of time:
Event time
This is the time when the event happened. For example, when any user visits our website, that time is the event time for that event. Event time is usually the time that matters most when processing stream data.
Processing time
This is the time at which a stream-processing application received the event in order to perform some calculation. This time can be milliseconds, hours, or days after the event occurred. This notion of time is highly unreliable and best avoided.
State
Stream processing for a single event may be easy. But stream processing usually are more evolved than that. Stream processing usually contains following (but not limited to) operations:
Counting the number of events by type
Moving averages over 5 minutes window
Joining two streams to create an enriched stream of information
Aggregating data over hour
Sum, average, quantile over data
We call the information that is stored between events a state. State can be of following two kinds:
Internal State
State that is accessible only by a specific instance of the stream-processing application. This state is usually maintained and managed with an embedded, memory database running within the application. Embedded memory database allows it to be very fast. However, since it’s in-memory, we are limited by the amount of the data that it can store. As a consequence, sometimes stream processing is done by making several sub-stream of the data so that processing can be done using internal state.
External State
State that stores data in any external datastore like Cassandra are called external state. The advantage of this state is that we have unlimited memory and the data is accessible from anywhere. However, being external mean that we would have to bear external latency and added complexity of external system.
Stream-Table Duality
A database table allows checking the state of the data at a specific point in time. Unlike tables, streams contain a history of changes. Streams are a string of events wherein each event caused a change. A table contains a current state of the world, which is the result of many changes.
Let’s assume that we are tracking event of an ATM machine. Following events could happen:
Bank stores 10000$ in the ATM at 10:00 AM.
Person A withdraws 10K at 10:05 AM.
Person B withdraws 1K at 11:05 AM.
Person C withdraws 2K at 12:05 PM.
Person D withdraws 3K at 12:08 PM.
Person E withdraws 4K at 03:05 PM.
The database would tell us that at any point of time, what is the balance in the ATM. Stream would tell us how busy is that ATM. Which is the busiest hour? Stream and database are the two views to represent a business transaction.
Stream Processing Design Pattern
Single-Event Processing
Here, stream processing framework consumes a single message and do some data manipulation and then writes the output to any other stream. An example could be, a framework that checks fraud_probablity of each event and puts it into a stream that sends email to the end user.
Processing with Local State
Most stream-processing applications are concerned with aggregating information, especially time-window aggregation. An example could be to find the busiest hour of the website in order to scale the infrastructure. These aggregations require maintaining a state for the stream. As in our example, in order to calculate the number of website hits in an hour, we need to keep a counter to keep track of website hits in moving window of one hour. This could be done in a local state.
Assume we want to find hits on different pages of website, we can partition the streams based on different pages and then aggregate it using a local state.
However, local state should be accommodated in the memory and it should be persisted so that if the infrastructure crashes, we are able to recover the state.
Multiphase Processing
The multiphase processing incorporates following phases:
Aggregate data using a local state
Publish the data into a new stream
Aggregate the data using the new stream mentioned in phase 2.
This type of multiphase processing is very familiar to those who write map-reduce code, where you often have to resort to multiple reduce phases.
Stream-Table Join
Sometimes stream processing requires integration with data external to the stream— validating transactions against a set of rules stored in a database, or enriching clickstream information with data about the users who clicked.
Now, making an external database call would mean not only extra latency, but also additional load on the database. The other constraint is that for the same sort of infrastructure, amount of events that can be processed by streaming platform is order of magnitude higher than what a database would process. So, this is clearly not a very scalable solution. Caching can be one strategy, but then caching the data means one need to manage cache infrastructure and manage data lifecycle. For example – how would you make sure that the data is not stale. One solution could be to ensure that the database changes are streamed and cache is updated based on the data in the stream.
Streaming Join
For example, let’s say that we have one stream with search queries that people entered into our website and another stream with clicks, which include clicks on search results. We want to match search queries with the results they clicked on so that we will know which result is most popular for which query. When you join two streams, you are joining the entire history, trying to match events in one stream with events in the other stream that have the same key and happened in the same time-windows. This is why a streaming-join is also called a windowed-join.
Joining two streams of events over a moving time window
Out-of-Sequence Events
Handling events that arrive at the stream at the wrong time is a challenge not just in stream processing but also in traditional ETL systems. For example, a mobile device of a Uber driver loses mobile signal for a few minutes and sends a few minutes worth of events when it reconnects.
In such scenario, the framework has to do following things:
a. Recognize that an event is out of sequence.
b. Define a time period during which it will attempt to reconcile. Outside the prescribed time period, the data will be considered useless.
c. Be able to update results which might mean updating a row in database.
If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter: https://skillcaptain.substack.com
If you are preparing for an interview, this post contains most of the things that you should know about Kafka.
Active Controller Count
Active controller is one of the brokers of Kafka cluster which is designated to do administrative tasks like reassigning partitions. The active controller count metric tells us id the broker is the controller for cluster or not. The value of this metric could be 0 or 1. This metric is emitted per broker.
What if two brokers say that they are the controller?
The active controller count metric indicates whether the broker is currently the controller for the cluster. The metric will either be 0 or 1, with 1 showing that the broker is currently the controller. Kafka cluster require one broker to be the controller and only one broker can be a controller at any given time.
What should you do when more than one broker claims to become controller? This situation will affect the administrative tasks of cluster. The first step could be restart of the brokers claiming to be controller.
Following are the two thread pools used by Kafka to handle requests:
Network Handlers
These are responsible for reading and writing data to the clients across the network. This does not require significant processing, so network handler don’t get exhausted easily.
Request Handlers
The request handler threads, however, are responsible for servicing the client request itself, which includes reading or writing the messages to disk. The request handler idle ratio metric indicates the percentage of time the request handlers are not in use. The lower this number, the more loaded the broker is. It is advisable to check the cluster for size or any other potential problem if the idle ratios goes lower than 20%.
Kafka uses purgatory to efficiently handle requests. Read about purgatory here.
The all topics bytes in rate, expressed in bytes per second, is useful as a measurement of how much message traffic your brokers are receiving from producing clients. This is a good metric to trend over time to help you determine when you need to expand the cluster or do other growth-related work. It is also useful for evaluating if one broker in a cluster is receiving more traffic than the others, which would indicate that it is necessary to rebalance the partitions in the cluster.
The all topics bytes out rate, similar to the bytes in rate, is another overall growth metric. In this case, the bytes out rate shows the rate at which consumers are reading messages out. The outbound bytes rate may scale differently than the inbound bytes rate.
The outbound bytes rate also includes the replica traffic. This means that if all of the topics are configured with a replication factor of 2, we will see a bytes out rate equal to the bytes in rate when there are no consumer clients.
The messages in rate shows the number of individual messages, regardless of their size, produced per second. This is useful as a growth metric as a different measure of producer traffic.
The partition count for a broker generally doesn’t change that much, as it is the total number of partitions assigned to that broker. This includes every replica the broker has, regardless of whether it is a leader or follower for that partition.
The leader count metric shows the number of partitions that the broker is currently the leader for. As with most other measurements in the brokers, this one should be generally even across the brokers in the cluster.
This measurement is only provided by the broker that is the controller for the cluster (all other brokers will report 0), and shows the number of partitions in the cluster that currently have no leader.
Kafka – The Definitive Guide by Neha Narkhede, Gwen Shapira & Todd Palino
If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter: https://skillcaptain.substack.com
There are two types of replica: Leader replica and Follower replica. Let’s say that there are three replicas of a partition. One of them, should be a leader. All the requests from producers and consumers would pass to the leader in order to guarantee consistency. All the replicas other than the leader are called follower. Follower do not serve any request and their only task is to keep themselves updated with the leader. One of the follower replicas become the leader in case the leader fails. The process where the data from the leader is fetched to the replicas is called replication. Please go through the article here, if you wish to read more about Kafka internals.
Replication is the core of Kafka.
If replication is not happening, in case of leader failure, we will not have a follower which can be made leader cleanly. It’s going to lead to inconsistency in data. So, how do we make sure that our partitions are not under replicated.
Fortunately, Kafka exposes JMX metric that can tell us if our replication is well behaving or not. We can expose these Kafka metrics to prometheus and set up an alert. This article will briefly discuss what are the potential reason for under replicated partitions and how to debug and fix it.
This is the one must have metric that should be monitored. This metric provides us a count of the follower replicas that are not caught up with the leader replica which are present in a particular broker. This metric is provided for every broker in a cluster. This single measurement provides insight into a number of problems with the Kafka cluster, from a broker being down to resource exhaustion.
How to debug under replicated partitions?
Broker-level problems
If any broker is down, all the replicas on it will not be synced up with the leader and you will see a constant number of under replicated partitions.
If the number of under replicated partitions is not constant, or if all the brokers are up and running and still you see a constant count of under replicated partitions, this typically indicates a performance issue in the cluster. If the under-replicated partitions are on a single broker, then that broker is typically the problem. We can see the list of under replicated partition using the kafka-topics.sh tool as shown in the picture below. We can now see the common broker as 2 in the list of the replicas that are under replicated. This means broker 2 is not working well.
Image taken from the book: Kafka – The Definitive Guide showing use of kafka-topics.sh to find non-performing broker
Cluster-level problems
Unbalanced load
How do we define unbalanced load on a broker in a cluster? If any broker has dramatically more count of partition, or it has significantly more bytes going in or out with respect to other brokers in the cluster, then the load on that cluster can be considered as unbalanced. In order to diagnose this problem, We will need several metrics from the brokers in the cluster:
Leader partition count
All topics bytes in rate
All topics messages in rate
Partition count
Here’s an example shown below. The example shown below has a balanced cluster, as all the metrics are approximately same.
What if traffic is not balanced within the cluster and results in under replicated partitions?
We will need to move partitions from the heavily loaded brokers to the less heavily loaded brokers. This is done using the kafka-reassign-partitions.sh tool
Another common cluster performance issue is exceeding the capacity of the brokers to serve requests. There are many possible resources deficit that could slow things down. CPU, disk IO, and network throughput are some of those resources. Disk utilization is not one of them, as the brokers will operate properly right up until the disk is filled, and then this disk will fail abruptly.
How do we diagnose a capacity problem?
There are many metrics you can track at the OS level, including:
Inbound network throughput
Outbound network throughput
Disk average wait time
Disk percent utilization
CPU utilization
Underreplicated partitions can be result of exhausting any one of the resources written above. It’s important to know that the broker replication process operates in exactly the same way that other Kafka clients do. If our cluster is having problems with replication, then our clients must be having problems with producing and consuming messages as well. It makes sense to develop a baseline for these metrics when our cluster is operating correctly and then set thresholds that indicate a developing problem long before we run out of capacity. We should also review the trend for these metrics as the traffic to our cluster increases over time. As far as Kafka broker metrics are concerned, the All Topics Bytes In Rate is a good guideline to show cluster usage.
Application-level problem
We should also check if there is another application running on the system that is consuming resources and putting pressure on the Kafka broker. This could be something that was installed to debug a problem, or it could be a process that is supposed to be running, like a monitoring agent, but is having problems. We can use the tools on your system, such as top, to identify if there is a process that is using more CPU or memory than expected.
There could also be a configuration problem that might have crept in the broker or system configuration.
Hardware-level Problem
Hardware problems could be as obvious as a server that stops working or it could be less obvious and it starts causing performance problems. These are usually soft failures that allow the system to keep running but in degraded mode. This could be a bad bit of memory, where the system has detected the problem and bypassed that segment (reducing the overall available memory). The same can happen with a CPU failure.
For problems such as these, you should be using the facilities that our hardware provides – such as an intelligent platform management interface (IPMI) to monitor hardware health. When there’s an active problem, looking at the kernel ring buffer using dmesg will help you to see log messages that are getting thrown to the system console.
Disk failure The more common type of hardware failure that leads to a performance degradation in Kafka is a disk failure. Apache Kafka is dependent on the disk for persistence of messages, and producer performance is directly tied to how fast our disks commit those writes. Any deviation in this will show up as problems with the performance of the producers and the replica fetchers. The latter is what leads to under-replicated partitions. As such, it is important to monitor the health of the disks at all times and address any problems quickly.
A single disk failure on a single broker can destroy the performance of an entire cluster. This is because the producer clients will connect to all brokers that lead partitions for a topic, and if you have followed best practices, those partitions will be evenly spread over the entire cluster. If one broker starts performing poorly and slowing down produce requests, this will cause back-pressure in the producers, slowing down requests to all brokers.
If you are new to Kafka, please read the first two posts of the series given below.
If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter: https://skillcaptain.substack.com
One thought on “Troubleshooting Under Replicated Kafka Partitions”
There are a number of measurement collected while Kafka is operational. We are going to collect these measurements in Prometheus and later make a dashboard in Grafana. All of the metrics exposed by Kafka can be accessed via the Java Management Extensions (JMX) interface. One way to use them in an external monitoring system is to use a collection agent provided by Prometheus and attach it to the Kafka process.
What is JMX?
JMX provides API to monitor and manage your resources at runtime. It provides Java developers with the means to instrument Java code, create smart Java agents, implement distributed management middleware and managers, and easily integrate these solutions into existing management and monitoring systems.
Application, in our case, Kafka has to implement an interface called MBean of Java. You can see the implementation here. Kafka created the MBean and registered it. We need to now access the data by exposing it in some manner. We can get access through JConsole or through the communication adaptor or connectors available. In this tutorial, we are going to use JMX Prometheus exporter. There are other exporters too like Nagios XI check_jmx plugin, jmxtrans, Jolokia and MX4J.
Quickstart Kafka
If your kafka broker is not set up, please follow the step by step guide here.
JMX Prometheus Java Agent
Java agents are part of the Java Instrumentation API. The Instrumentation APIs provide a mechanism to modify bytecodes of methods. This can be done both statically and dynamically. This means that we can change a program by adding code to it without having to touch upon the actual source code of the program. The result can have a significant impact on the overall behavior of the application. JMX Prometheus exporter is a collector that can bed configured to scrape and expose mBeans of a JMX target. The collected data in Promethues can be later shown in Grafana. This exporter is intended to be run as a Java Agent, exposing a HTTP server and serving metrics of the local JVM.
Download the JMX Prometheus Java agent by the following command:
Follow the instruction here to download Prometheus. Add localhost:7071 as the scrape target as shown below.
# my global config
global:
scrape_interval: 5s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 5s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).
# Alertmanager configuration
alerting:
alertmanagers:
- static_configs:
- targets:
# - alertmanager:9093
# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
# - "first_rules.yml"
# - "second_rules.yml"
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
- job_name: 'kafka-monitoring'
#metrics_path: '/actuator/prometheus'
static_configs:
- targets: ['localhost:7071']
Run the prometheus using following command.
./prometheus --config.file=prometheus.yml
You should be able to see following screen on prometheus for any kafka metrics of your choice. I have randomly picked
kafka_server_kafkaserver_yammer_metrics_count as the metric.
Import the downloaded json in prometheus and you will see a functional dashboard.
Kafka Grafana Dashboard
If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter: https://skillcaptain.substack.com
One thought on “Tutorial For Kafka Monitoring Using JMX, Prometheus And Grafana”
Assume that there are two messages – Message A and Message B. Message B is sent after Message A by a producer. These messages are written in the same partition of a Kafka topic. In this case, Kafka guarantees that the offset of message B is more than message A. This also means that a consumer who is consuming from this particular partition will always consume message B after it has consumed message A.
Guarantee of commit
Producer messages are considered committed only when the message is written to the partition in all the in-sync replicas. Producers can ask for acknowledgment at different levels – when the message is sent over the network, when it is committed or when it was written at the leader. Consumers can only read the message that is committed. Messages that are committed once will not be lost until there is at least one replica alive.
These basic guarantees can be used to design a reliable system. However, they alone are not sufficient. We live in a world of practicalities where infrastructure cost is limited (and therefore the number of replicas), traffic to the system is sometimes not predictable (and hence load on individual brokers), and so on and so forth. So, how can we make our system better in terms of reliability?
Reliability at the broker level
Replication Factor
You can create a topic with topic-level configuration called replication.factor. If you choose to not specify the replication factor at the topic level, the topics are created with the default configuration which is specified at the broker level called the default.replication.factor.
Replication of N means that you can afford to lose N-1 brokers without affecting the read and write of data. That means that you have fewer disasters but you will have N brokers and more infrastructure costs.
But what is the right number of replicas for your Kafka topic?
It really depends on how critical your application is. If you are okay with some data being lost when a broker is restarted, you may work with a replication factor of 1. Banking applications may need a high replica count so as to ensure high availability. One way to think about the replica factor is how costly the unavailability of your topic is. The topic which is processing click streams for sending promotional messages may be less critical than the topic which is processing banking transactions.
Unclean Leader Election
If the leader goes down and there are in-sync replicas available, one of the in-sync replicas will become the leader. But what happens when the leader goes down and there is no in-sync replica?
How can this happen though?
Let’s say there are three replicas. Assume two followers go down and then the producer continues writing to the leader. This makes the other two replicas out-of-sync. Now, if the leader goes down and one of the followers starts again. We will have an out-of-sync replica as the only available replica.
Assume that two followers out of three replicas lag in syncing with the leader. Meanwhile, the leader is accepting the requests of read and write. Now, if the leader goes down, two followers, out of which one could be the potential leader are essentially out of sync.
What options do we have now?
If we don’t allow these out-of-sync replicas to become the leader, we might have significant downtime which can hurt the business.
And if we allow one of these out-of-sync replicas to become the leader, we risk data loss and data inconsistencies. Imagine the two out-of-sync replicas have offset from 2000-3000 and the leader has reached the offset of 4000. Now, if the leader goes down and one of the out-of-sync replicas becomes leader, offset in the new leader will start from 3001. Consumers who have already read data till 4000, will not read the new data from offset 3001 to 4000 in the new leader. This will lead to data inconsistency.
The unclean election is the configuration where we allow out-of-sync replicas to become leader. This configuration is done by setting unclean.leader.election.enable flag to true. This flag is set to false where data quality and consistency is critical.
Minimum In-Sync Replicas
As per Kafka reliability guarantees, data is considered committed when it is written to all in-sync replicas, even when all means just one replica and the data could be lost if that replica is unavailable. To make sure that committed data is written to more than one replica, the minimum number of in-sync replicas needed to certify that the message is indeed committed should be greater than 1. This can be done by a configuration called min.insync.replicas. This configuration is set up at both topic as well as the broker level.
What happens when producers try to send a message when there is not enough number of in-sync replicas?
Producers will get the NotEnoughReplicasException.
Reliability at the producer level
Send Acknowledgments
Producers can choose between three different acknowledgment modes:
acks=0 means that a message is considered to be written successfully to Kafka if the producer managed to send it over the network. We will get serialization error but we won’t get error if the Kafka cluster is down.
acks=1 means that the leader will send either an acknowledgment or an error the moment it got the message and wrote it to the partition data file. We can lose data if the leader crashes and some messages that were successfully written to the leader and acknowledged were not replicated to the followers before the crash.
acks=all means that the leader will wait until all in-sync replicas got the message before sending back an acknowledgment or an error
Configuring Producer Retries
There are two parts to handling errors in the producer:
The errors that the producers handle automatically. Errors like LEADER_NOT_AVAILABLE are retriable as leader might be available if retried.
The errors that you as the developer using the producer library must handle. Errors like INVALID_CONFIG have to be handled by developers as these have to be fixed first by the developers before it can be retried.
Retries and careful error handling can guarantee that each message will be stored at least once, but we can’t guarantee it will be stored exactly once. Applications make the messages idempotent—meaning that even if the same message is sent twice, it has no negative impact on correctness.
Reliability at the consumer level
To ensure data reliability, we need to take care of following configurations:
group.id
The basic idea is that if two consumers have the same group ID and subscribe to the same topic, each will be assigned a subset of the partitions in the topic and will therefore only read a subset of the messages individually but all the messages will be read by the group as a whole.
auto.offset.reset
This can have two possible values – latest and earliest.
When the consumer has just started, it does not know what offset it has to ask for. In this scenario, if the value of this flag is set as earliest, then consumer will start consuming from the first message that the partition has. On the other hand, if the value of the flag is set as latest, then the consumer will start consuming from the current message that has been committed in the partition.
enable.auto.commit
Offset has to be committed. Period.
The real question is when?
If we set the enable.auto.commit flag as true, consumer will automatically commit the offset after the time interval specified in auto.commit.interval.ms flag. It’s all fine until the offset for the message which is not processed, but is read, is also committed. In general, committing more frequently adds some overhead but reduces the number of duplicates that can occur when a consumer stops. As a general rule, we should always commit offsets after events are processed.
Handling Long Processing Times
In some versions of the Kafka consumer, we can’t stop polling for more than a few seconds. Even if you don’t want to process additional records, we must continue polling so the client can send heartbeats to the broker. A common pattern in these cases is to hand off the data to a thread-pool when possible with multiple threads to speed things up a bit by processing in parallel. After handing off the records to the worker threads, you can pause the consumer and keep polling without actually fetching additional data until the worker threads finish. Once they are done, we can resume the consumer. Because the consumer never stops polling, the heartbeat will be sent as planned and rebalancing will not be triggered.
If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter: https://skillcaptain.substack.com
A quick recap of the basic terminology of Kafka can be found here.
The fundamental unit that Kafka manages is called a message. Message is simply a byte array without any restriction of the format. As you would know, producers produce data to a topic. Each topic is further divided into multiple partitions. Messages are sent from producer to Kafka in batches. A batch is a collection of messages that are produced to a particular topic and partition. This batching is done in order to avoid the round trip across the network for each individual message.
But why is the data in a topic stored in multiple partitions?
Imagine partition as a file. Each message is appended at the end of the file. Messages are read from the start to the end of the file. Partitions can be distributed across the brokers. Since messages are produced to different partitions of a topic, one can not expect the guarantee of time ordering across all messages. However, messages in one partition have a guarantee of time ordering.
Partitions are the way Kaka ensures scalability and redundancy. Each partition can be hosted on a different server, which means that a single topic can be scaled horizontally across multiple servers to provide performance far beyond the ability of a single server. This effectively means that when the count of messages are low, you can have your all partitions on one broker. On the other hand, if the message count and size increases, we can have just one partition on one server. One particular partition cannot be split between multiple brokers and not even between multiple disks on the same broker. Therefore, size of a partition is limited by the space available on a single mount point.
Replication is the core of Kafka. Each partition is replicated for redundancy and for the inevitable failure of hardware.
How does Kafka handle replication?
There are two types of replica: Leader replica and Follower replica. Let’s say there are three replicas of a partition. One of them, should be a leader. All the requests from producers and consumers would pass to the leader in order to guarantee consistency. All the replicas other than the leader are called follower. Follower do not serve any request and their only task is to keep themselves updated with the leader. One of the follower replicas become the leader in case the leader fails.
How does a follower stay in sync with the leader?
Follower sends a FETCH request to the leader. The request has the offset of the message that Kafka wants to receive next and will always be in the order. A replica will request message 1, then message 2, and then message 3, and it will not request message 4 before it gets the message 1, 2 and 3. Therefore the leader knows that a replica got all messages up to message 3 when the replica requests message 4. Leader knows how far behind each replica is by looking at the last offset requested by each replica. A replica is considered out of sync if the replica hasn’t requested a message in more than ten seconds. Any out of sync replica can not become leader.
What protocol does Kafka use for communication? Is it HTTP?
Kakfa does not uses HTTP. Kafka uses a binary protocol over TCP. The protocol defines all APIs as request and response message pairs. The client initiates a socket connection and then writes a sequence of request messages and reads back the corresponding response message. The client will likely need to maintain a connection to multiple brokers, as data is partitioned and the clients will need to talk to the server that has their data.
The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well. The broker’s request processing allows only a single in-flight request per connection in order to guarantee this ordering. Note that clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput. i.e., clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer. All requests are initiated by the client, and result in a corresponding response message from the server.
The server has a configurable maximum limit on request size and any request that exceeds this limit will result in the socket being disconnected.
Example of FETCH Request
What do Kafka developers have to say about their choice of communication protocol?
One topic with 9 partition and replication factor of 2. That means, we have a total of 18 partitions.
Now, Kafka would try to assign 6 partition (18/3) on each broker so that no two replica are on same broker. If the rake information is available, it is also considered in the allocation of partition to a broker.
How does Kafka retain a message for the specified duration?
Each topic has a specified retention period or the size after which the messages are deleted, even if all the consumers have not read the message. Kafka splits each partition into segments. Each segment contains either 1 GB of data or a week of data, whichever is smaller, by default. As a Kafka broker is writing to a partition, if the segment limit is reached, we close the file and start a new one. The segment we are currently writing to is called an active segment. The active segment is never deleted, so if you set log retention to only store a day of data but each segment contains five days of data, you will really keep data for five days because we can’t delete the data before the segment is closed. If you choose to store data for a week and roll a new segment every day, you will see that every day we will roll a new segment while deleting the oldest segment, so most of the time the partition will have seven segments.
What is the file format of each segment?
Each segment is stored in a single data file. Inside the file, we store Kafka messages and their offsets. The format of the data on the disk is identical to the format of the messages that we send from the producer to the broker and later from the broker to the consumers. Using the same message format on disk and over the wire is what allows Kafka to use zero-copy optimization when sending messages to consumers and also avoid decompressing and recompressing messages that the producer already compressed. Each message contains—in addition to its key, value, and offset—things like the message size, checksum code that allows us to detect corruption, magic byte that indicates the version of the message format, compression codec (Snappy, GZip, or LZ4), and a timestamp.
What are indexes in Kafka?
Kafka allows consumers to start fetching messages from any available offset. This means that if a consumer asks for 1 MB messages starting at offset 100, the broker must be able to quickly locate the message for offset 100 (which can be in any of the segments for the partition) and start reading the messages from that offset on. In order to help brokers quickly locate the message for a given offset, Kafka maintains an index for each partition. The index maps offsets to segment files and positions within the file.
What is compaction in Kafka?
Kafka allows the retention policy on a topic to compact, which only stores the most recent value for each key in the topic. Obviously, setting the policy to compact only makes sense on topics for which applications produce events that contain both a key and a value. If the topic contains null keys, compaction will fail.
If you liked this article and would like one such blog to land in your inbox every week, consider subscribing to our newsletter: https://skillcaptain.substack.com
5 thoughts on “Kafka Internals: How does Kafka store the data?”
Leave a Reply