Kafka Storage Architecture
How Kafka Topic are logically organised
Create Topic
kafka-topic.bat --create --zookeeper local:2181 --topic invoice --partitions 5 --repilication-factor 3
Topic can store millions of messages.
To store them efficiently topics are splitted into partitions
For Apache Kafka partition means a physical directory
Kafka organize message in topics
Broker create Log file to store messages
this log files are partitioned, replicated and segmented
Like in Database we create table to store records. Table have datafile
In Kafka we create topic to store messages. Topic have log files
Relication Factor
Replication factor specify how many copies we want to maintain for each partition
Number of replicas(15) = partition (5) * replication (3)
If number of brokers are 3 then 5 directories will be created on each broker
Classification of Partitions replicas
Leader and Followers
- There are leader partition replica and follower partitions replicas
- Number of follower depends on the replication factors
- following commad show which broker hold the leader partition
- kafka-topic.bat --describe --zookeeper localhost:2181 --topic xyz
Follower is duplicate copy of leader
LogFile
- Kafka store data in log file
- log segment file are splitted.segment file grow till segment limit is reached
- when reached it close file and create new.
- max size is 1GB or 1 week of data.
Offset
- Each kafka message in the partition is uniquely identified by 64 bit integer offset
- offset is unique within partition
- once limit for the segment is reached then offset in next segment is in continuation to prior segment
- offset starts with ZERO in EACH partition
Since the offset is not unique within Topic to locate Message we need 3 things
- TOPIC NAME
- PARTITION NUMBER
- OFFSET NUMBER
- In typical flow consumer application request message based on offset.
- First from 0 it request for message. broker give first 9, then consumer process it
- then it request for next set. broker give 15. Next time it request msg after 15th offset.
Kafak allow consumer to request message form given offset.
If consumer demand for message beginning with offset 100 then broker must be able to locate message starting from offset 100.
to help locate quickly Index file is used.
Index file is also segmented and located inside partition
If we want to seek messages based on timestamp, then timestamp index is used.
we may need all message after some timestamp. To support this kafka maintain timestamp index
Scalability and how kafka cluster is formed.
Cluster is group of brokers which work together to share the work load.
lets see who manages cluster membership and who manages Administrative Task.
In Typical cluster there is master which maintain group of workers
Master always know state of members
who manage which broker has crashed and if any new broker has joined.
Kafka broker is master-less cluster. it doesn't follow master slave pattern
Kafka use zookeeper to maintain list of active broker.
Each broker is assigned unique id. that is defined in config.
we also defined zookeeper connection details in broker configuration file
when broker starts it connect to zookeeper and create ephemeral node using broker id to represent active broker session
ephemeral node remain active as long as broker is active
when broker loss connectivity , zookeeper remove ephemeral node
List of active broker is maintain through the ephemeral node
it is maintain under brokers/ids on zookeeper folder
we can check broker ids using zookeeper shell
start zookeeper shell
ls /
give hierarchy's
ls /brokers/ids
will give active broker
Kafka is master less cluster.
However we still need someone to monitor active brokers and reassign the work when active broker leave the cluster
all these activities are performed by controller in kafka cluster
Controller is not master
it is simply a broker which is elected as controller to take up extra responsibility.
controller also act as regular broker.
There is only one controller in kafka cluster at any given time
Controller is responsible for monitoring active broker in cluster
When any broker leaves the cluster it reassign work to other broker
Controller selection is done based on the first broker that starts in cluster.
first broker get elected as controller by creating ephemeral node in zookeeper.
Other broker also try to get elected as controller but when they see ephemeral controller node is already assigned then exception is raised controller already exists.
they start watching controller to disappear.
When controller dies then ephemeral node disappears
then Each broker tries to elect as controller but only one succeeds
Kafka Distributed Architecture
Kafka partition data is self contented. i.e all segment and index are in same directory.
Kafka Cluster
Every Kafka broker is responsible for managing one or more partitions
Kafka cluster is group of broker. This broker maybe running on individual machines. Those machine machine must be organised on multiple racks.
Example
In diagram we have 6 node cluster using 2 RACKS
Partition allocation
- Partitions are evenly distributed as much as possible to achieve work load balance
- Follower partition must be placed on different machine
To distribute the partitions Replica Kafka follow following steps
- Ordered list of Brokers
- Leader and Followers assignment
Kafka choose random broker as first broker and place them in list
Kafak put broker in order by putting broker from alternate RACKS in list
Once broker list is prepared Kafka assign leader and followers.
It make sure if RACK fails then backup copy is available on other RACK and if any broker fail then the backup copy is available on other machine.
First Leader partition is assigned in sequence to the brokers from the the list.
Once Leaders are assigned then Followers are assigned. it starts by skipping first broker and then assign in sequence
For second follower also it follow same step by jumping off from previous start
So here Even distribution is not achieved.
Broker 0 have 4 partition
And Broker 4 have 6 partition
But we have achieved Fault Tolerance
So now even if RACK1 is down then we will still have at-least one copy of partition on RACK2
Leader partition is responsible for all the request from producer and consumer.
Kafka producer connect to any one broker and gets the metadata.
All Kafka broker can anwser the meta data request. Hence producer can connect any broker and query for metadata
Metadata response contains list of Leaders and their host and port information
So Producer have list of all leaders.
Producer directly communicate with leader partition.
On receiving message leader broker persist the message and sent back the acknowledgement
Similarly when consumer want the message it always read from leader partition.
So leader broker responsibility is to interact with producer and consumer.
Follower do not communicate with producer and Consumer. Their only job is to stay upto date with Leader.
Aim of follower is to get elected as the leader when current leader dies.
SO their single point agenda is to stay in sync with leader. Only then they have high chance of getting elected as leader.
To stay in Sync follower request for data from leader and persist data
Sometimes follower may be lagging behind leader. Main reason can be Network congestion or broker failure.
Leader has one more important job. to maintain list of insync replicas.
This list is know as ISR - In Sync Replicas of partition
this is persisted in zookeeper
and maintained by leader broker
It contain followers which are in sync with leader
So they are excellent candidate to be elected as leader, if leader fails
How leader know which follower are in sync
When replica request for data from particular offset, then leader can make out how far is the replica
based on that it add or remove replica from ISR list.
to judge not to far, it check if replica is less then 10 sec behind or not.
If behind then it remove. if within 10 sec then it maintain in ISR
There is one gotcha here
What is all follower are behind by more then 10 sec and leader partition broker crash.
This gives 2 more concepts
committed vs uncommitted messages
Minimum insync replica
we can configure leader to not to consider message as committed until is copied to all replicas
so leader will have committed as well as uncommitted message
If we lose leader then we will have committed message in follower but we loss uncommitted msg which were on leader
Uncommited message shouldn't be a worry. because those message can be reset by the producer.
Producer can choose to receive the acknowledgement
Producer wait for timeout period and resend the message untill it get the acknowledgment
So incase of lost of uncommitted message because of the failure of leader, newly elected leader will received uncommited message from producer.
MINIMUM INSYNC REPLICA
Data is consider as committed when it is written to all insync replicas.
If we configure 3 out 2 replica to be in sync to consider committed message
and if 2 replica goes down, then leader will remove them from ISR. In this case even though we configure to have 3 replica we are left with just one which is leader.
then data will be consider as committed if data is on all ISR replica, which in this case is just one the leader itself. This is risky scenario because data will be lost if we loss the leader
Kafka protect this by setting minimum in sync replica for topic
If we want to be sure that data is written to atleast 2 replica set min.insync.replica=2.
So if out 3, if 2 replica are no in sync then broker will not except any message and will became read only
and will raise exception not enough replica
Kafka Producer Internal
0 = Create Time: producer timestamp
1 = Log Append Time:- Time when Broker received the message
we can use only one.
setting message.timestamp.type= 0 default (create timestamp)
We package the message content in producer object with atlest 2 mandatory argument
topic name
messgae value = main content
optional items :-
message key:- critical. used for partitioning, grouping, join.
message timestamp rarely set:
target partition: rarely set
Once ready it handover to kafka producer using send method
Kafka producer then give message to broker over network
Before producer give message to broker, it does many things
Every records goes through
Serializer: necessary to send data over network. we have to specify value serializer and message serlizer
Partioner:- is message key is present then hashing algo is used. else it uses round robin partitioning.
buffer: Once serialized and partitioned. record seats in partition wise buffer. buffer hold records which are not send to broker.
Producer also run background i/o thread- that is responsible for turning this record into request and transfering them to broker
Buffering is needed for Asynchronous and network optimization.
So send method will add record to buffer and run without blocking.
those records are then transmitted by background IO thread
Send method is not delayed for network operation
Buffering also allow I/O thread to combine multiple messages from same buffer and transmit them as single packet. this way we achieve better throughput
If records are posted faster then they can be transmitted to the server then buffer space will be exhausted.
So next send method will block for few miliseconds. until buffer is freed by i/o thread
If IO thread takes lot of time to free up buffer then send method raise timeout exception
If we see timeout error then we have to increase the producer memory
Default producer memory is 32 MB
to increase change buffer.memory config.
When Broker receive message it sends back the Success acknowledgment.
If Broker fail to write then it return error
If I/O thread doesnot receive success or Error it may re-try 2-3 times. before giving up and throwing back error
Retries are set by setting retry_producer config.
Atleast Once
Kafka send message to leader partition. and then follower sync up with the leader
as Per ISR minimum replicas are updated message is deemed to be commited and acknoledgment is sent to the producers i/o thread
If for some reason say netwrok delay or error, if I/O thred don't get acknoledgment then I/O thred again send the same message. which will cause duplicate
there is no way for Broker to identify that this is duplicate.
This is the ATLAST once meachanism of Kafka.
where Kafka want atleast one success acknowledgment.
there is ATMOST once concept too. which can be achieved by settign retries to 0
Exactly once is also possible.
this is possible by setting enable.idempotence=true
Internal ID for producer instance: It will do initial handshake with leader broker and ask for producer ID. At broker side, broker dynamically assign unique id to each proceduer
message sequence number: producer assign monotonically seq id to each message
So each message habe producer id and message ID.
so every time when leader get message it check last received message id from producer and then can identify missing or duplicate message
Idempotent manage duplicate only for retries.
not for the application level duplicates.
Aggreagation
Tumbling window
Hopping window (Sliding window)