Wednesday, 21 July 2021

Python Asyncio Multithreading

 AsyncIO is a relatively new framework to achieve concurrency in python. In this article, I will compare it with traditional methods like multithreading and multiprocessing. Before jumping into examples, I will add a few refreshers about concurrency in python.

  • CPython enforces GIL (Global Interpreter Lock) which prevents taking full advantage of multithreading. Each thread needs to acquire this mutually exclusive lock before running any bytecode
  • Multithreading is usually preferred for network I/O or disk I/O as threads need not compete hard among themselves for acquiring GIL.
  • Multiprocessing is usually preferred for CPU intensive tasks. Multiprocessing doesn’t need GIL as each process has its state, however, creating and destroying processes is not trivial.
  • Multithreading with threading module is preemptive, which entails voluntary and involuntary swapping of threads.
  • AsyncIO is a single thread single process cooperative multitasking. An asyncio task has exclusive use of CPU until it wishes to give it up to the coordinator or event loop. (Will cover the terminology later)

Hands-on Examples

Delay Messages

import logging
logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT",
                   9: "NINE", 10: "TEN"}

def delay_message(delay, message):
    logging.info(f"{message} received")
    time.sleep(delay)
    logging.info(f"Printing {message}")

def main():
    logging.info("Main started")
    delay_message(2, num_word_mapping[2])
    delay_message(3, num_word_mapping[3])
    logging.info("Main Ended")

main()
12:39:00:MainThread:Main started
12:39:00:MainThread:TWO received
12:39:02:MainThread:Printing TWO
12:39:02:MainThread:THREE received
12:39:05:MainThread:Printing THREE
12:39:05:MainThread:Main Ended

Concurrency with Threading

def delay_message(delay, message):
    logging.info(f"{message} received")
    time.sleep(delay)
    logging.info(f"Printing {message}")

def main():
    logging.info("Main started")
    threads = [threading.Thread(target=delay_message, args=(delay, message)) for delay, message in zip([2, 3], 
                                                                            [num_word_mapping[2], num_word_mapping[3]])]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join() # waits for thread to complete its task
    logging.info("Main Ended")
main()
12:39:05:MainThread:Main started
12:39:05:Thread-4:TWO received
12:39:05:Thread-5:THREE received
12:39:07:Thread-4:Printing TWO
12:39:08:Thread-5:Printing THREE
12:39:08:MainThread:Main Ended

Thread pool

import concurrent.futures as cf
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT",
                    9: "NINE", 10: "TEN"}


def delay_message(delay, message):
    logging.info(f"{message} received")
    time.sleep(delay)
    logging.info(f"Printing {message}")
    return message


if __name__ == '__main__':
    with cf.ThreadPoolExecutor(max_workers=2) as executor:
        future_to_mapping = {executor.submit(delay_message, i, num_word_mapping[i]): num_word_mapping[i] for i in
                             range(2, 4)}
        for future in cf.as_completed(future_to_mapping):
            logging.info(f"{future.result()} Done")
10:42:36:ThreadPoolExecutor-0_0:TWO received
10:42:36:ThreadPoolExecutor-0_1:THREE received
10:42:38:ThreadPoolExecutor-0_0:Printing TWO
10:42:38:MainThread:TWO Done
10:42:39:ThreadPoolExecutor-0_1:Printing THREE
10:42:39:MainThread:THREE Done

Concurrency with AsyncIO

  1. Event Loop Or Coordinator: Coroutine that manages other coroutines. You can think of it as a scheduler or master.
  2. Awaitable object Coroutine, Tasks, and Future are awaitable objects. A coroutine can await on awaitable objects. While a coroutine is awaiting on an awaitable object, its execution is temporarily suspended and resumed after Future is done.

import asyncio
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT",
                   9: "NINE", 10: "TEN"}

async def delay_message(delay, message):
    logging.info(f"{message} received")
    await asyncio.sleep(delay) # time.sleep is blocking call. Hence, it cannot be awaited and we have to use asyncio.sleep
    logging.info(f"Printing {message}")
    
async def main():
    logging.info("Main started")
    logging.info(f'Current registered tasks: {len(asyncio.all_tasks())}')
    logging.info("Creating tasks")
    task_1 = asyncio.create_task(delay_message(2, num_word_mapping[2])) 
    task_2 = asyncio.create_task(delay_message(3, num_word_mapping[3]))
    logging.info(f'Current registered tasks: {len(asyncio.all_tasks())}')
    await task_1 # suspends execution of coroutine and gives control back to event loop while awaiting task completion.
    await task_2
    logging.info("Main Ended")

if __name__ == '__main__':
    
    asyncio.run(main()) # creats an envent loop
07:35:32:MainThread:Main started
07:35:32:MainThread:Current registered tasks: 1
07:35:32:MainThread:Creating tasks
07:35:32:MainThread:Current registered tasks: 3
07:35:32:MainThread:TWO received
07:35:32:MainThread:THREE received
07:35:34:MainThread:Printing TWO
07:35:35:MainThread:Printing THREE
07:35:35:MainThread:Main Ended

Though the program is running on a single thread, it can achieve the same level of performance as the multithreaded code by cooperative multitasking.

A better way to create AsyncIO tasks

import asyncio
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT",
                   9: "NINE", 10: "TEN"}

async def delay_message(delay, message):
    logging.info(f"{message} received")
    await asyncio.sleep(delay) # time.sleep is blocking call. Hence, it cannot be awaited and we have to use asyncio.sleep
    logging.info(f"Printing {message}")
    
async def main():
    logging.info("Main started")
    logging.info("Creating multiple tasks with asyncio.gather")
    await asyncio.gather(*[delay_message(i+1, num_word_mapping[i+1]) for i in range(5)]) # awaits completion of all tasks
    logging.info("Main Ended")

if __name__ == '__main__':
    
    asyncio.run(main()) # creats an envent loop
08:09:20:MainThread:Main started
08:09:20:MainThread:ONE received
08:09:20:MainThread:TWO received
08:09:20:MainThread:THREE received
08:09:20:MainThread:FOUR received
08:09:20:MainThread:FIVE received
08:09:21:MainThread:Printing ONE
08:09:22:MainThread:Printing TWO
08:09:23:MainThread:Printing THREE
08:09:24:MainThread:Printing FOUR
08:09:25:MainThread:Printing FIVE
08:09:25:MainThread:Main Ended

Caution about Blocking Calls in AsyncIO Tasks

import asyncio
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT",
                   9: "NINE", 10: "TEN"}

async def delay_message(delay, message):
    logging.info(f"{message} received")
    if message != 'THREE':
        await asyncio.sleep(delay) # non-blocking call. gives up execution
    else:
        time.sleep(delay) # blocking call
    logging.info(f"Printing {message}")
    
async def main():
    logging.info("Main started")
    logging.info("Creating multiple tasks with asyncio.gather")
    await asyncio.gather(*[delay_message(i+1, num_word_mapping[i+1]) for i in range(5)]) # awaits completion of all tasks
    logging.info("Main Ended")

if __name__ == '__main__':

    asyncio.run(main()) # creats an envent loop
11:07:31:MainThread:Main started
11:07:31:MainThread:Creating multiple tasks with asyncio.gather
11:07:31:MainThread:ONE received
11:07:31:MainThread:TWO received
11:07:31:MainThread:THREE received
11:07:34:MainThread:Printing THREE
11:07:34:MainThread:FOUR received
11:07:34:MainThread:FIVE received
11:07:34:MainThread:Printing ONE
11:07:34:MainThread:Printing TWO
11:07:38:MainThread:Printing FOUR
11:07:39:MainThread:Printing FIVE
11:07:39:MainThread:Main Ended

When the delay_message receives message THREE, it makes a blocking call and doesn’t give up control to the event loop until it completes the task, thus stalling the progress of execution. Hence, it takes three seconds more than the previous run. Though this example seems to be tailored, it can happen if you are not careful. On the other hand, threading is preemptive, where OS preemptively switches thread if it is waiting on a blocking call.

Race Conditions

import concurrent.futures as cf
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

class DbUpdate:
    def __init__(self):
        self.value = 0

    def update(self):
        logging.info("Update Started")
        logging.info("Sleeping")
        time.sleep(2) # thread gets switched
        logging.info("Reading Value From Db")
        tmp = self.value**2 + 1
        logging.info("Updating Value")
        self.value = tmp
        logging.info("Update Finished")
        
db = DbUpdate()
with cf.ThreadPoolExecutor(max_workers=5) as executor:
    updates = [executor.submit(db.update) for _ in range(2)]
logging.info(f"Final value is {db.value}")
20:28:15:ThreadPoolExecutor-0_0:Update Started
20:28:15:ThreadPoolExecutor-0_0:Sleeping
20:28:15:ThreadPoolExecutor-0_1:Update Started
20:28:15:ThreadPoolExecutor-0_1:Sleeping
20:28:17:ThreadPoolExecutor-0_0:Reading Value From Db
20:28:17:ThreadPoolExecutor-0_1:Reading Value From Db
20:28:17:ThreadPoolExecutor-0_0:Updating Value
20:28:17:ThreadPoolExecutor-0_1:Updating Value
20:28:17:ThreadPoolExecutor-0_1:Update Finished
20:28:17:ThreadPoolExecutor-0_0:Update Finished
20:28:17:MainThread:Final value is 1

The final value should ideally be 2. However, due to preemptive swapping of threads, thread-0 got swapped before updating value, hence the updates were erroneous producing final value as 1. We have to use locks to prevent this from happening.


import concurrent.futures as cf
import logging
import time
import threading

LOCK = threading.Lock()

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

class DbUpdate:
    def __init__(self):
        self.value = 0

    def update(self):
        logging.info("Update Started")
        logging.info("Sleeping")
        time.sleep(2) # thread gets switched
        with LOCK:
            logging.info("Reading Value From Db")
            tmp = self.value**2 + 1
            logging.info("Updating Value")
            self.value = tmp
            logging.info("Update Finished")
        
db = DbUpdate()
with cf.ThreadPoolExecutor(max_workers=5) as executor:
    updates = [executor.submit(db.update) for _ in range(2)]
logging.info(f"Final value is {db.value}")
21:02:45:ThreadPoolExecutor-0_0:Update Started
21:02:45:ThreadPoolExecutor-0_0:Sleeping
21:02:45:ThreadPoolExecutor-0_1:Update Started
21:02:45:ThreadPoolExecutor-0_1:Sleeping
21:02:47:ThreadPoolExecutor-0_0:Reading Value From Db
21:02:47:ThreadPoolExecutor-0_0:Updating Value
21:02:47:ThreadPoolExecutor-0_0:Update Finished
21:02:47:ThreadPoolExecutor-0_1:Reading Value From Db
21:02:47:ThreadPoolExecutor-0_1:Updating Value
21:02:47:ThreadPoolExecutor-0_1:Update Finished
21:02:47:MainThread:Final value is 2

Race Conditions are Rare with AsyncIO

import asyncio
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

class DbUpdate:
    def __init__(self):
        self.value = 0

    async def update(self):
        logging.info("Update Started")
        logging.info("Sleeping")
        await asyncio.sleep(1)
        logging.info("Reading Value From Db")
        tmp = self.value**2 + 1
        logging.info("Updating Value")
        self.value = tmp
        logging.info("Update Finished")
        
async def main():
    db = DbUpdate()
    await asyncio.gather(*[db.update() for _ in range(2)])
    logging.info(f"Final value is {db.value}")
    
asyncio.run(main())
20:35:49:MainThread:Update Started
20:35:49:MainThread:Sleeping
20:35:49:MainThread:Update Started
20:35:49:MainThread:Sleeping
20:35:50:MainThread:Reading Value From Db
20:35:50:MainThread:Updating Value
20:35:50:MainThread:Update Finished
20:35:50:MainThread:Reading Value From Db
20:35:50:MainThread:Updating Value
20:35:50:MainThread:Update Finished
20:35:50:MainThread:Final value is 2

As you can see, once the task got resumed after sleeping, it didn’t give up control until it completed the execution of coroutine. With threading, thread swapping in not very obvious, but with asyncio, we can control on when exactly the coroutine execution should be suspended. Nonetheless, it can go wrong when two coroutines enter a deadlock.


import asyncio 

async def foo():
    await boo()
    
async def boo():
    await foo()
    
async def main():
    await asyncio.gather(*[foo(), boo()])
    
asyncio.run(main())

Multiprocessing

Synchronous version



import concurrent.futures as cf
import logging
import math
import numpy as np
import time
import threading

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

r_lists = [[np.random.randint(500000) for _ in range(30000)] for _ in range(1000)]

def merge(l_1, l_2):
    out = []
    key_1 = 0
    key_2 = 0
    for i in range(len(l_1) + len(l_2)):
        if l_1[key_1] < l_2[key_2]:
            out.append(l_1[key_1])
            key_1 += 1
            if key_1 == len(l_1):
                out = out + l_2[key_2:]
                break
        else:
            out.append(l_2[key_2])
            key_2 += 1
            if key_2 == len(l_2):
                out = out + l_1[key_1:]
                break
    return out

def merge_sort(l):
    if len(l) == 1:
        return l
    mid_point = math.floor((len(l) + 1) / 2)
    l_1, l_2 = merge_sort(l[:mid_point]), merge_sort(l[mid_point:])
    out = merge(l_1, l_2)
    del l_1, l_2
    return out

if __name__ == '__main__':
    logging.info("Starting Sorting")
    for r_list in r_lists:
        _ = merge_sort(r_list)
    logging.info("Sorting Completed")
21:24:07:MainThread:Starting Sorting
21:26:10:MainThread:Sorting Completed

Asynchronous version


import concurrent.futures as cf
import logging
import math
import numpy as np
import time
import threading

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

r_lists = [[np.random.randint(500000) for _ in range(30000)] for _ in range(1000)]

def merge(l_1, l_2):
    out = []
    key_1 = 0
    key_2 = 0
    for i in range(len(l_1) + len(l_2)):
        if l_1[key_1] < l_2[key_2]:
            out.append(l_1[key_1])
            key_1 += 1
            if key_1 == len(l_1):
                out = out + l_2[key_2:]
                break
        else:
            out.append(l_2[key_2])
            key_2 += 1
            if key_2 == len(l_2):
                out = out + l_1[key_1:]
                break
    return out

def merge_sort(l):
    if len(l) == 1:
        return l
    mid_point = math.floor((len(l) + 1) / 2)
    l_1, l_2 = merge_sort(l[:mid_point]), merge_sort(l[mid_point:])
    out = merge(l_1, l_2)
    del l_1, l_2
    return out

if __name__ == '__main__':
    logging.info("Starting Sorting")
    with cf.ProcessPoolExecutor() as executor:
        sorted_lists_futures = [executor.submit(merge_sort, r_list) for r_list in r_lists]
    logging.info("Sorting Completed")
21:29:33:MainThread:Starting Sorting
21:30:03:MainThread:Sorting Completed

By default, the number of processes is equal to the number of processors on the machine. You can observe a considerable improvement in execution time between two versions.

taken from

https://medium.com/analytics-vidhya/asyncio-threading-and-multiprocessing-in-python-4f5ff6ca75e8


No comments:

Post a Comment