Saturday, 2 October 2021

Data Structure

 Data Structures



memory slot = 8bit

8 bit = 1byte

In 32 bits, we require 4 byte for each individual element; 

in 64 bit, we require 8 bytes


logb(x) = y  => bpower y = x

 log (x) with base 2 is equals to 'y' and the condition is '2^y' is equal to 'x'.

for computer base is always 2


So what should be the value of log of 1? 

To find the value of log (1), all you have to do is use this equation. 

So we have '2^y' is equals to 1. 

Since we already know the 'n' part, so this is equals to 1. Now we need to find the power of 2 and that should give us the value of 1. 

So '2^0' is equals to 1, then here 'y' is equals to 0. That means the value of log (1) is equals to 0.


log(2) = 1

log(4)  = 2

log(8) = 3

log(16)  = 4

log(512) = 9

log(1024) = 10

log(1048576) 20

When we double the value of n, value of log increase by 1


So data structure are of two types, which is primitive data structure and non primitive.

Primitive, I think you already know, Integer, Float, Pointers, Char and lot more, 

whereas non primitive can be divided into two parts, which is Linear data structure and Non Linear data structure. 

These are sequential and these are random

So linear data structure have Array, Stack, Queue, Link List, and there will be few other, non linear have Graph and Trees


Array


a variable as vooc = [ 1, 2 ,3] 

as we are using 32 bit system, this will take 12 bytes of space, which is 12 memory slot; 4 slot per value

There are basically two types static and dynamic. If you are using C or C++, Java, then you might have noticed while initializing any type of array, you have to give its size. if you are talking about three elements, you have to specify their size while initializing, while declaring itself.


the second one is dynamic array. If you are using Python or JavaScript, then by default, you are using dynamic array, you don't have to provide any size, it is taken care by your system itself. 


with static array, once we allocate memory slot, it will never change. Since we have already declared the size while initializing and they are fixed, we cannot just append them and add more elements, these are static, these are fixed in size and we cannot change them


Now, if I talk about dynamic while initializing, we don't have to specify any of their size, and we can append them and can add as many elements as we want. Since they are dynamic, there are several advantages that we are going to have with this array

static array - need continuous chunk of memory. for any new item added it shit to new address for continuous chunks

Dynamic array = allocate memory in logs value. so when we need 5 item with allocation 2^3 8 value store. for 9 value allocation 2^4=16 

Linked list = 8 Bits (4 to store value , 4 to store next address) first is head and last is tail point to None

Double linked list = 12 bits (4 to store value, 4 to store last address, 4 to store next address)

STACK = LIFO last in first out. pop remove  last item. peek/top give last item, push put new item on top (use array)


Queue = FIFO (deQueue - remove first, enqueue - pput new item in end, peek - first item view) (use linked List)




HEAP - 

can be arranged in an Array

                     80

          52                    76                                        

72             34        56         67         

80  52   76   72   34   56  67

0    1      2     3     4    5     6     7    8


so child position left = 2i + 1  example 52 left child is 2* 1 + 1 = 3 (72)

                           right  2i + 2    2*1 +2 = 4 (34)

same way parent for new child will be  7/2 - 1 = 3  so 72 will be the parent for child on 7the position


all odd have position have left child and even have right child



Binary Search

def binarySearch(my_array, target):

    left = 0

    right = len(my_array) - 1


    while left <= right:

        middle = (left + right) // 2

        middle_ele  = my_array[middle]


        if target == middle_ele:

            return middle

        elif target < middle_ele:

            right = middle - 1

        else:

            left = middle + 1


    return -1



    print(binarySearch([1,5,10,12,25,30,32], 30))


Binary Search  With Recurrsion


def binarySearchRecurrsion(my_array, target):

    left = 0

    right = len(my_array) - 1

    result = helper(my_array, target, left, right)

    return result


def helper(my_array, target, left, right):

    if left <= right:

        return -1


    middle = (left + right) // 2

    middle_ele  = my_array[middle]


    if target == middle_ele:

        return middle

    elif target < middle_ele:

        right = 

        return helper(my_array, target, left, middle - 1)

    else:

        return helper(my_array, target, middle - 1, right)


    return -1



print(binarySearchRecurrsion([1,5,10,12,25,30,32], 30))


def selectionSort(arr):

    """sort by looking for smallest item and put it in place 

complexity is O(n^2)

"""

    for i in range(len(arr)):

        min_x = if

        for item in range(i+1, len(arr)):

            if arr[item] < arr[min_x]:

                mn_x = item

        arr[i], arr[min_x] = arr[min_x], arr[i]

arr = [20,12,10,15,2]

selectionSort(arr)

print(arr)







































































































































Friday, 13 August 2021

Kafka

Kafka Storage Architecture 

How Kafka Topic are logically organised

Create Topic

kafka-topic.bat --create --zookeeper local:2181 --topic invoice --partitions 5 --repilication-factor 3


Topic can store millions of messages. 

To store them efficiently topics are splitted into partitions

For Apache Kafka partition means a physical directory


Kafka organize message in topics
Broker create Log file to store messages
this log files are partitioned, replicated and segmented




Topic is logical name to group messages
Like in Database we create table to store records. Table have datafile
In Kafka we create topic to store messages. Topic have log files

Relication Factor

Replication factor specify how many copies we want to maintain for each partition

Number of replicas(15)  = partition (5) * replication (3)

If number of brokers are 3 then 5 directories will be created on each broker




Classification of Partitions replicas

Leader and Followers

  • There are leader partition replica and follower partitions replicas
  • Number of follower depends on the replication factors
  • following commad show which broker hold the leader partition
  • kafka-topic.bat --describe --zookeeper localhost:2181 --topic xyz  

Follower is duplicate copy of leader





LogFile
  • Kafka store data in log file
  • log segment file are splitted.segment file grow till segment limit is reached
  • when reached it close file and create new.
  • max size is 1GB or 1 week of data.

Offset

  • Each kafka message in the partition is uniquely identified by 64 bit integer offset
  • offset is unique within partition
  • once limit for the segment is reached then offset in next segment is in continuation to prior segment
  • offset starts with ZERO in EACH partition
Since the offset is not unique within Topic to locate Message we need 3 things
  1. TOPIC NAME
  2. PARTITION NUMBER
  3. OFFSET NUMBER
  • In typical flow consumer application request message based on offset.
  • First from 0 it request for message. broker give first 9, then consumer process it
  • then it request for next set. broker give 15. Next time it request msg after 15th offset.
Kafak allow consumer to request message form given offset.
If consumer demand for message beginning with offset 100 then broker must be able to locate message starting from offset 100.
to help locate quickly Index file is used.
Index file is also segmented and located inside partition

If we want to seek messages based on timestamp, then timestamp index is used.
we may need all message after some timestamp. To support this kafka maintain timestamp index


Kafka Cluster Architecture

Scalability and how kafka cluster is formed.

Cluster is group of brokers which  work together to share the work load.



lets see who manages cluster membership and who manages Administrative Task.

In Typical cluster there is master which maintain group of workers
Master always know state of members
who manage which broker has crashed and if any new broker has joined.

Kafka broker is master-less cluster. it doesn't follow master slave pattern

Kafka use zookeeper to maintain list of active broker.
Each broker is assigned unique id. that is defined in config.
we also defined zookeeper connection details in broker configuration file

when broker starts it connect to zookeeper and create ephemeral node using broker id to represent active broker session
ephemeral node remain active as long as broker is active
when broker loss connectivity , zookeeper remove ephemeral node

List of active broker is maintain through the ephemeral node
it is maintain under brokers/ids on zookeeper folder

we can check broker ids using zookeeper shell
start zookeeper shell
ls /
give hierarchy's

ls /brokers/ids
will give active broker


Kafka is master less cluster.
However we still need someone to monitor active brokers and reassign the work when active broker leave the cluster
all these activities are performed by controller in kafka cluster
Controller is not master
it is simply a broker which is elected as controller to take up extra responsibility.
controller also act as regular broker.


There is only one controller in kafka cluster at any given time
Controller is responsible for monitoring active broker in cluster

When any broker leaves the cluster it reassign work to other broker

Controller selection is done based on the first broker that starts in cluster.
first broker get elected as controller by creating ephemeral node in zookeeper.

Other broker also try to get elected as controller but when they see ephemeral controller node is already assigned then exception is raised controller already exists.
they start watching controller to disappear. 
When controller dies then ephemeral node disappears
then Each broker tries to elect as controller but only one succeeds


Kafka Distributed Architecture


Kafka partition data is self contented. i.e all segment and index are in same directory.



Kafka Cluster


Every Kafka broker is responsible for managing one or more partitions
Kafka cluster is group of broker. This broker maybe  running  on individual machines. Those machine  machine must be organised on multiple racks. 
Example 
In diagram we have 6 node cluster using 2 RACKS


Partition allocation



There are 2 goals for partition distribute (not not rules)

  1. Partitions are evenly distributed as much as possible to achieve work load balance
  2. Follower partition must be placed on different machine

To distribute the partitions Replica Kafka follow following steps

  1. Ordered list of Brokers
  2. Leader and Followers assignment
Kafka choose random broker as first broker and place them in list

Kafak put broker in order by putting broker from alternate RACKS in list





Once broker list is prepared Kafka assign leader and followers.
 It make sure if RACK fails then backup copy is available on other RACK and if any broker fail then the backup copy is available on other machine.


First Leader partition is assigned in sequence to the brokers from the the list.





Once Leaders are assigned then Followers are assigned. it starts by skipping first broker and then assign in sequence



For second follower also it follow same step by jumping off from previous start





So here Even distribution is not achieved.
Broker 0 have 4 partition
And Broker 4 have 6 partition

But we have achieved Fault Tolerance

So now even if RACK1 is down then we will still have at-least one copy of partition on RACK2






Leader partition is responsible for all the request from producer and consumer.

Kafka producer connect to any one broker and gets the metadata.
All Kafka broker can anwser the meta data request. Hence producer can connect any broker and query for metadata

Metadata response contains list of Leaders and their host and port information
So Producer have list of all leaders.

Producer directly communicate with leader partition. 
On receiving message leader broker persist the message and sent back the acknowledgement

Similarly when consumer want the message it always read from leader partition.

So leader broker responsibility is to interact with producer and consumer.

Follower do not communicate with producer and Consumer. Their only job is to stay upto date with Leader.

Aim of follower is to get elected as the leader when current leader dies.

SO their single point agenda is to stay in sync with leader. Only then they have high chance of getting elected as leader.

To stay in Sync follower request for data from leader and persist data


Sometimes follower may be lagging behind leader. Main reason can be Network congestion or broker failure.


Leader has one more important job. to maintain list of insync replicas.
This list is know as ISR - In Sync Replicas of partition
this is persisted in zookeeper
and maintained by leader broker


ISR list is important
It contain followers which are in sync with leader
So they are excellent candidate to be elected as leader, if leader fails

How leader know which follower are in sync



When replica request for data from particular offset, then leader can make out how far is the replica
based on that it add or remove replica from ISR list.

to judge not to far, it check if replica is less then 10 sec behind or not.
If behind then it remove. if within 10 sec then it maintain in ISR


There is one gotcha here
What is all follower are behind by more then 10 sec and leader partition broker crash.



So now ISR list is empty or P2





This gives 2 more concepts
committed vs uncommitted messages
Minimum insync replica

we can configure leader to not to consider message as committed until is copied to all replicas
so leader will have committed as well as uncommitted message




If we lose leader then we will have committed message in follower but we loss uncommitted msg which were on leader
Uncommited message shouldn't be a worry. because those message can be reset by the producer.
Producer can choose to receive the acknowledgement

Producer wait for timeout period and resend the message untill it get the acknowledgment
So incase of lost of uncommitted message because of the failure of leader, newly elected leader will received uncommited message from producer.





MINIMUM INSYNC REPLICA

Data is consider as committed when it is written to all insync replicas.
If we configure 3 out 2 replica to be in sync to consider committed message
and if 2 replica goes down, then leader will remove them from ISR. In this case even though we configure to have 3 replica we are left with just one which is leader.
then data will be consider as committed if data is on all ISR replica, which in this case is just one the leader itself. This is risky scenario because data will be lost if we loss the leader

Kafka protect this by setting minimum in sync replica for topic
If we want to be sure that data is written to atleast 2 replica set  min.insync.replica=2.

So if out 3, if 2 replica are no in sync then broker will not except any message and will became read only
and will raise exception not enough replica







Kafka Producer Internal


0 = Create Time: producer timestamp

1 = Log Append Time:- Time when Broker received the message

we can use only one.
setting message.timestamp.type= 0  default  (create timestamp)


We package the message content in  producer object with atlest 2 mandatory argument 

topic name
messgae value = main content

optional items :- 
message key:-  critical. used for partitioning, grouping, join.
message timestamp rarely set:
target partition: rarely set

Once ready it handover to kafka producer using send method
Kafka producer then give message to broker over network

Before producer give message to broker, it does many things

Every records goes through 
Serializer: necessary to send data over network. we have to specify value serializer and message serlizer 
Partioner:- is message key is present then hashing algo is used. else it uses round robin partitioning.
buffer: Once serialized and partitioned. record seats in partition wise  buffer. buffer hold records which are not send to broker.

Producer also run background i/o thread- that is responsible for turning this record into request and transfering them to broker

Buffering is needed for Asynchronous and network optimization.
So send method will add record to buffer and run without blocking.
those records are then transmitted by background IO thread
Send method is not delayed for network operation
Buffering also allow I/O thread to combine multiple messages from same buffer and transmit them as single packet. this way we achieve better throughput

If records are posted faster then they can be  transmitted to the server then buffer space will be exhausted.
So next send method will block for few miliseconds. until buffer is freed by i/o thread

If IO thread takes lot of time to free up buffer then send method raise timeout exception

If we see timeout error  then we have to increase the producer memory
Default producer memory is 32 MB
to increase change buffer.memory config.

When Broker receive message it sends back the Success acknowledgment.
If Broker fail to write then it return error


If I/O thread doesnot receive success or Error it may re-try 2-3 times. before giving up and throwing back error
Retries are set by setting retry_producer config.





Atleast Once

Kafka send message to leader partition. and then follower sync up with the leader
as Per ISR minimum replicas are updated message is deemed to be commited and acknoledgment is sent to the producers i/o thread

If for some reason say netwrok delay or error, if I/O thred don't get acknoledgment then I/O thred again send the same message. which will cause duplicate

there is no way for Broker to identify that this is duplicate.
This is the ATLAST once meachanism of Kafka.
where Kafka want atleast one success acknowledgment.


there is ATMOST once concept too. which can be achieved by settign retries to 0

Exactly once is also possible.
this is possible by setting enable.idempotence=true


Internal ID for producer instance: It will do initial handshake with leader broker and ask for producer ID. At broker side, broker dynamically assign unique id to each proceduer


message sequence number: producer assign monotonically seq id to each message

So each message habe producer id and message ID.
so every time when leader get message it check last received message id from producer and then can identify missing or duplicate message

Idempotent manage duplicate only for retries. 
not for the application level duplicates.



Aggreagation


Tumbling window

Hopping window (Sliding window)




















Wednesday, 21 July 2021

Python Asyncio Multithreading

 AsyncIO is a relatively new framework to achieve concurrency in python. In this article, I will compare it with traditional methods like multithreading and multiprocessing. Before jumping into examples, I will add a few refreshers about concurrency in python.

  • CPython enforces GIL (Global Interpreter Lock) which prevents taking full advantage of multithreading. Each thread needs to acquire this mutually exclusive lock before running any bytecode
  • Multithreading is usually preferred for network I/O or disk I/O as threads need not compete hard among themselves for acquiring GIL.
  • Multiprocessing is usually preferred for CPU intensive tasks. Multiprocessing doesn’t need GIL as each process has its state, however, creating and destroying processes is not trivial.
  • Multithreading with threading module is preemptive, which entails voluntary and involuntary swapping of threads.
  • AsyncIO is a single thread single process cooperative multitasking. An asyncio task has exclusive use of CPU until it wishes to give it up to the coordinator or event loop. (Will cover the terminology later)

Hands-on Examples

Delay Messages

import logging
logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT",
                   9: "NINE", 10: "TEN"}

def delay_message(delay, message):
    logging.info(f"{message} received")
    time.sleep(delay)
    logging.info(f"Printing {message}")

def main():
    logging.info("Main started")
    delay_message(2, num_word_mapping[2])
    delay_message(3, num_word_mapping[3])
    logging.info("Main Ended")

main()
12:39:00:MainThread:Main started
12:39:00:MainThread:TWO received
12:39:02:MainThread:Printing TWO
12:39:02:MainThread:THREE received
12:39:05:MainThread:Printing THREE
12:39:05:MainThread:Main Ended

Concurrency with Threading

def delay_message(delay, message):
    logging.info(f"{message} received")
    time.sleep(delay)
    logging.info(f"Printing {message}")

def main():
    logging.info("Main started")
    threads = [threading.Thread(target=delay_message, args=(delay, message)) for delay, message in zip([2, 3], 
                                                                            [num_word_mapping[2], num_word_mapping[3]])]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join() # waits for thread to complete its task
    logging.info("Main Ended")
main()
12:39:05:MainThread:Main started
12:39:05:Thread-4:TWO received
12:39:05:Thread-5:THREE received
12:39:07:Thread-4:Printing TWO
12:39:08:Thread-5:Printing THREE
12:39:08:MainThread:Main Ended

Thread pool

import concurrent.futures as cf
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT",
                    9: "NINE", 10: "TEN"}


def delay_message(delay, message):
    logging.info(f"{message} received")
    time.sleep(delay)
    logging.info(f"Printing {message}")
    return message


if __name__ == '__main__':
    with cf.ThreadPoolExecutor(max_workers=2) as executor:
        future_to_mapping = {executor.submit(delay_message, i, num_word_mapping[i]): num_word_mapping[i] for i in
                             range(2, 4)}
        for future in cf.as_completed(future_to_mapping):
            logging.info(f"{future.result()} Done")
10:42:36:ThreadPoolExecutor-0_0:TWO received
10:42:36:ThreadPoolExecutor-0_1:THREE received
10:42:38:ThreadPoolExecutor-0_0:Printing TWO
10:42:38:MainThread:TWO Done
10:42:39:ThreadPoolExecutor-0_1:Printing THREE
10:42:39:MainThread:THREE Done

Concurrency with AsyncIO

  1. Event Loop Or Coordinator: Coroutine that manages other coroutines. You can think of it as a scheduler or master.
  2. Awaitable object Coroutine, Tasks, and Future are awaitable objects. A coroutine can await on awaitable objects. While a coroutine is awaiting on an awaitable object, its execution is temporarily suspended and resumed after Future is done.

import asyncio
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT",
                   9: "NINE", 10: "TEN"}

async def delay_message(delay, message):
    logging.info(f"{message} received")
    await asyncio.sleep(delay) # time.sleep is blocking call. Hence, it cannot be awaited and we have to use asyncio.sleep
    logging.info(f"Printing {message}")
    
async def main():
    logging.info("Main started")
    logging.info(f'Current registered tasks: {len(asyncio.all_tasks())}')
    logging.info("Creating tasks")
    task_1 = asyncio.create_task(delay_message(2, num_word_mapping[2])) 
    task_2 = asyncio.create_task(delay_message(3, num_word_mapping[3]))
    logging.info(f'Current registered tasks: {len(asyncio.all_tasks())}')
    await task_1 # suspends execution of coroutine and gives control back to event loop while awaiting task completion.
    await task_2
    logging.info("Main Ended")

if __name__ == '__main__':
    
    asyncio.run(main()) # creats an envent loop
07:35:32:MainThread:Main started
07:35:32:MainThread:Current registered tasks: 1
07:35:32:MainThread:Creating tasks
07:35:32:MainThread:Current registered tasks: 3
07:35:32:MainThread:TWO received
07:35:32:MainThread:THREE received
07:35:34:MainThread:Printing TWO
07:35:35:MainThread:Printing THREE
07:35:35:MainThread:Main Ended

Though the program is running on a single thread, it can achieve the same level of performance as the multithreaded code by cooperative multitasking.

A better way to create AsyncIO tasks

import asyncio
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT",
                   9: "NINE", 10: "TEN"}

async def delay_message(delay, message):
    logging.info(f"{message} received")
    await asyncio.sleep(delay) # time.sleep is blocking call. Hence, it cannot be awaited and we have to use asyncio.sleep
    logging.info(f"Printing {message}")
    
async def main():
    logging.info("Main started")
    logging.info("Creating multiple tasks with asyncio.gather")
    await asyncio.gather(*[delay_message(i+1, num_word_mapping[i+1]) for i in range(5)]) # awaits completion of all tasks
    logging.info("Main Ended")

if __name__ == '__main__':
    
    asyncio.run(main()) # creats an envent loop
08:09:20:MainThread:Main started
08:09:20:MainThread:ONE received
08:09:20:MainThread:TWO received
08:09:20:MainThread:THREE received
08:09:20:MainThread:FOUR received
08:09:20:MainThread:FIVE received
08:09:21:MainThread:Printing ONE
08:09:22:MainThread:Printing TWO
08:09:23:MainThread:Printing THREE
08:09:24:MainThread:Printing FOUR
08:09:25:MainThread:Printing FIVE
08:09:25:MainThread:Main Ended

Caution about Blocking Calls in AsyncIO Tasks

import asyncio
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT",
                   9: "NINE", 10: "TEN"}

async def delay_message(delay, message):
    logging.info(f"{message} received")
    if message != 'THREE':
        await asyncio.sleep(delay) # non-blocking call. gives up execution
    else:
        time.sleep(delay) # blocking call
    logging.info(f"Printing {message}")
    
async def main():
    logging.info("Main started")
    logging.info("Creating multiple tasks with asyncio.gather")
    await asyncio.gather(*[delay_message(i+1, num_word_mapping[i+1]) for i in range(5)]) # awaits completion of all tasks
    logging.info("Main Ended")

if __name__ == '__main__':

    asyncio.run(main()) # creats an envent loop
11:07:31:MainThread:Main started
11:07:31:MainThread:Creating multiple tasks with asyncio.gather
11:07:31:MainThread:ONE received
11:07:31:MainThread:TWO received
11:07:31:MainThread:THREE received
11:07:34:MainThread:Printing THREE
11:07:34:MainThread:FOUR received
11:07:34:MainThread:FIVE received
11:07:34:MainThread:Printing ONE
11:07:34:MainThread:Printing TWO
11:07:38:MainThread:Printing FOUR
11:07:39:MainThread:Printing FIVE
11:07:39:MainThread:Main Ended

When the delay_message receives message THREE, it makes a blocking call and doesn’t give up control to the event loop until it completes the task, thus stalling the progress of execution. Hence, it takes three seconds more than the previous run. Though this example seems to be tailored, it can happen if you are not careful. On the other hand, threading is preemptive, where OS preemptively switches thread if it is waiting on a blocking call.

Race Conditions

import concurrent.futures as cf
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

class DbUpdate:
    def __init__(self):
        self.value = 0

    def update(self):
        logging.info("Update Started")
        logging.info("Sleeping")
        time.sleep(2) # thread gets switched
        logging.info("Reading Value From Db")
        tmp = self.value**2 + 1
        logging.info("Updating Value")
        self.value = tmp
        logging.info("Update Finished")
        
db = DbUpdate()
with cf.ThreadPoolExecutor(max_workers=5) as executor:
    updates = [executor.submit(db.update) for _ in range(2)]
logging.info(f"Final value is {db.value}")
20:28:15:ThreadPoolExecutor-0_0:Update Started
20:28:15:ThreadPoolExecutor-0_0:Sleeping
20:28:15:ThreadPoolExecutor-0_1:Update Started
20:28:15:ThreadPoolExecutor-0_1:Sleeping
20:28:17:ThreadPoolExecutor-0_0:Reading Value From Db
20:28:17:ThreadPoolExecutor-0_1:Reading Value From Db
20:28:17:ThreadPoolExecutor-0_0:Updating Value
20:28:17:ThreadPoolExecutor-0_1:Updating Value
20:28:17:ThreadPoolExecutor-0_1:Update Finished
20:28:17:ThreadPoolExecutor-0_0:Update Finished
20:28:17:MainThread:Final value is 1

The final value should ideally be 2. However, due to preemptive swapping of threads, thread-0 got swapped before updating value, hence the updates were erroneous producing final value as 1. We have to use locks to prevent this from happening.


import concurrent.futures as cf
import logging
import time
import threading

LOCK = threading.Lock()

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

class DbUpdate:
    def __init__(self):
        self.value = 0

    def update(self):
        logging.info("Update Started")
        logging.info("Sleeping")
        time.sleep(2) # thread gets switched
        with LOCK:
            logging.info("Reading Value From Db")
            tmp = self.value**2 + 1
            logging.info("Updating Value")
            self.value = tmp
            logging.info("Update Finished")
        
db = DbUpdate()
with cf.ThreadPoolExecutor(max_workers=5) as executor:
    updates = [executor.submit(db.update) for _ in range(2)]
logging.info(f"Final value is {db.value}")
21:02:45:ThreadPoolExecutor-0_0:Update Started
21:02:45:ThreadPoolExecutor-0_0:Sleeping
21:02:45:ThreadPoolExecutor-0_1:Update Started
21:02:45:ThreadPoolExecutor-0_1:Sleeping
21:02:47:ThreadPoolExecutor-0_0:Reading Value From Db
21:02:47:ThreadPoolExecutor-0_0:Updating Value
21:02:47:ThreadPoolExecutor-0_0:Update Finished
21:02:47:ThreadPoolExecutor-0_1:Reading Value From Db
21:02:47:ThreadPoolExecutor-0_1:Updating Value
21:02:47:ThreadPoolExecutor-0_1:Update Finished
21:02:47:MainThread:Final value is 2

Race Conditions are Rare with AsyncIO

import asyncio
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

class DbUpdate:
    def __init__(self):
        self.value = 0

    async def update(self):
        logging.info("Update Started")
        logging.info("Sleeping")
        await asyncio.sleep(1)
        logging.info("Reading Value From Db")
        tmp = self.value**2 + 1
        logging.info("Updating Value")
        self.value = tmp
        logging.info("Update Finished")
        
async def main():
    db = DbUpdate()
    await asyncio.gather(*[db.update() for _ in range(2)])
    logging.info(f"Final value is {db.value}")
    
asyncio.run(main())
20:35:49:MainThread:Update Started
20:35:49:MainThread:Sleeping
20:35:49:MainThread:Update Started
20:35:49:MainThread:Sleeping
20:35:50:MainThread:Reading Value From Db
20:35:50:MainThread:Updating Value
20:35:50:MainThread:Update Finished
20:35:50:MainThread:Reading Value From Db
20:35:50:MainThread:Updating Value
20:35:50:MainThread:Update Finished
20:35:50:MainThread:Final value is 2

As you can see, once the task got resumed after sleeping, it didn’t give up control until it completed the execution of coroutine. With threading, thread swapping in not very obvious, but with asyncio, we can control on when exactly the coroutine execution should be suspended. Nonetheless, it can go wrong when two coroutines enter a deadlock.


import asyncio 

async def foo():
    await boo()
    
async def boo():
    await foo()
    
async def main():
    await asyncio.gather(*[foo(), boo()])
    
asyncio.run(main())

Multiprocessing

Synchronous version



import concurrent.futures as cf
import logging
import math
import numpy as np
import time
import threading

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

r_lists = [[np.random.randint(500000) for _ in range(30000)] for _ in range(1000)]

def merge(l_1, l_2):
    out = []
    key_1 = 0
    key_2 = 0
    for i in range(len(l_1) + len(l_2)):
        if l_1[key_1] < l_2[key_2]:
            out.append(l_1[key_1])
            key_1 += 1
            if key_1 == len(l_1):
                out = out + l_2[key_2:]
                break
        else:
            out.append(l_2[key_2])
            key_2 += 1
            if key_2 == len(l_2):
                out = out + l_1[key_1:]
                break
    return out

def merge_sort(l):
    if len(l) == 1:
        return l
    mid_point = math.floor((len(l) + 1) / 2)
    l_1, l_2 = merge_sort(l[:mid_point]), merge_sort(l[mid_point:])
    out = merge(l_1, l_2)
    del l_1, l_2
    return out

if __name__ == '__main__':
    logging.info("Starting Sorting")
    for r_list in r_lists:
        _ = merge_sort(r_list)
    logging.info("Sorting Completed")
21:24:07:MainThread:Starting Sorting
21:26:10:MainThread:Sorting Completed

Asynchronous version


import concurrent.futures as cf
import logging
import math
import numpy as np
import time
import threading

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

r_lists = [[np.random.randint(500000) for _ in range(30000)] for _ in range(1000)]

def merge(l_1, l_2):
    out = []
    key_1 = 0
    key_2 = 0
    for i in range(len(l_1) + len(l_2)):
        if l_1[key_1] < l_2[key_2]:
            out.append(l_1[key_1])
            key_1 += 1
            if key_1 == len(l_1):
                out = out + l_2[key_2:]
                break
        else:
            out.append(l_2[key_2])
            key_2 += 1
            if key_2 == len(l_2):
                out = out + l_1[key_1:]
                break
    return out

def merge_sort(l):
    if len(l) == 1:
        return l
    mid_point = math.floor((len(l) + 1) / 2)
    l_1, l_2 = merge_sort(l[:mid_point]), merge_sort(l[mid_point:])
    out = merge(l_1, l_2)
    del l_1, l_2
    return out

if __name__ == '__main__':
    logging.info("Starting Sorting")
    with cf.ProcessPoolExecutor() as executor:
        sorted_lists_futures = [executor.submit(merge_sort, r_list) for r_list in r_lists]
    logging.info("Sorting Completed")
21:29:33:MainThread:Starting Sorting
21:30:03:MainThread:Sorting Completed

By default, the number of processes is equal to the number of processors on the machine. You can observe a considerable improvement in execution time between two versions.

taken from

https://medium.com/analytics-vidhya/asyncio-threading-and-multiprocessing-in-python-4f5ff6ca75e8