AsyncIO is a relatively new framework to achieve concurrency in python. In this article, I will compare it with traditional methods like multithreading and multiprocessing. Before jumping into examples, I will add a few refreshers about concurrency in python.
- CPython enforces GIL (Global Interpreter Lock) which prevents taking full advantage of multithreading. Each thread needs to acquire this mutually exclusive lock before running any bytecode
- Multithreading is usually preferred for network I/O or disk I/O as threads need not compete hard among themselves for acquiring GIL.
- Multiprocessing is usually preferred for CPU intensive tasks. Multiprocessing doesn’t need GIL as each process has its state, however, creating and destroying processes is not trivial.
- Multithreading with threading module is preemptive, which entails voluntary and involuntary swapping of threads.
- AsyncIO is a single thread single process cooperative multitasking. An asyncio task has exclusive use of CPU until it wishes to give it up to the coordinator or event loop. (Will cover the terminology later)
Hands-on Examples
Delay Messages
The program delays printing message. While the MainThread is sleeping, the CPU is idle, which is a bad use of resources.
import logging logger_format = '%(asctime)s:%(threadName)s:%(message)s' logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S") num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT", 9: "NINE", 10: "TEN"} def delay_message(delay, message): logging.info(f"{message} received") time.sleep(delay) logging.info(f"Printing {message}") def main(): logging.info("Main started") delay_message(2, num_word_mapping[2]) delay_message(3, num_word_mapping[3]) logging.info("Main Ended") main()
12:39:00:MainThread:Main started
12:39:00:MainThread:TWO received
12:39:02:MainThread:Printing TWO
12:39:02:MainThread:THREE received
12:39:05:MainThread:Printing THREE
12:39:05:MainThread:Main Ended
Concurrency with Threading
Using python’s threading module to run multiple invocations of delay_message on separate non-daemon threads. Unsurprisingly, the program executes faster than the above synchronous version by two seconds. OS swaps threads when a thread is idle (sleeping). You can relate sleeping to making a system call to communicate with an external environment.
def delay_message(delay, message): logging.info(f"{message} received") time.sleep(delay) logging.info(f"Printing {message}") def main(): logging.info("Main started") threads = [threading.Thread(target=delay_message, args=(delay, message)) for delay, message in zip([2, 3], [num_word_mapping[2], num_word_mapping[3]])] for thread in threads: thread.start() for thread in threads: thread.join() # waits for thread to complete its task logging.info("Main Ended") main()
12:39:05:MainThread:Main started
12:39:05:Thread-4:TWO received
12:39:05:Thread-5:THREE received
12:39:07:Thread-4:Printing TWO
12:39:08:Thread-5:Printing THREE
12:39:08:MainThread:Main Ended
Thread pool
Though threads are lightweight, creating and destroying a large number of threads is expensive. concurrent.futures
is built on top of the threading module and provides a neat interface to create ThreadPool and ProcessPool. Instead of creating a new thread for a function call, it reuses an existing thread in the pool.
import concurrent.futures as cf import logging import time logger_format = '%(asctime)s:%(threadName)s:%(message)s' logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S") num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT", 9: "NINE", 10: "TEN"} def delay_message(delay, message): logging.info(f"{message} received") time.sleep(delay) logging.info(f"Printing {message}") return message if __name__ == '__main__': with cf.ThreadPoolExecutor(max_workers=2) as executor: future_to_mapping = {executor.submit(delay_message, i, num_word_mapping[i]): num_word_mapping[i] for i in range(2, 4)} for future in cf.as_completed(future_to_mapping): logging.info(f"{future.result()} Done")
10:42:36:ThreadPoolExecutor-0_0:TWO received
10:42:36:ThreadPoolExecutor-0_1:THREE received
10:42:38:ThreadPoolExecutor-0_0:Printing TWO
10:42:38:MainThread:TWO Done
10:42:39:ThreadPoolExecutor-0_1:Printing THREE
10:42:39:MainThread:THREE Done
Concurrency with AsyncIO
- Coroutine: Unlike a conventional function with a single point of exit, a coroutine can pause and resume its execution. Creation of coroutine is as simple as using
async
keyword before declaring a function. - Event Loop Or Coordinator: Coroutine that manages other coroutines. You can think of it as a scheduler or master.
- Awaitable object Coroutine, Tasks, and Future are awaitable objects. A coroutine can await on awaitable objects. While a coroutine is awaiting on an awaitable object, its execution is temporarily suspended and resumed after Future is done.
import asyncio import logging import time logger_format = '%(asctime)s:%(threadName)s:%(message)s' logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S") num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT", 9: "NINE", 10: "TEN"} async def delay_message(delay, message): logging.info(f"{message} received") await asyncio.sleep(delay) # time.sleep is blocking call. Hence, it cannot be awaited and we have to use asyncio.sleep logging.info(f"Printing {message}") async def main(): logging.info("Main started") logging.info(f'Current registered tasks: {len(asyncio.all_tasks())}') logging.info("Creating tasks") task_1 = asyncio.create_task(delay_message(2, num_word_mapping[2])) task_2 = asyncio.create_task(delay_message(3, num_word_mapping[3])) logging.info(f'Current registered tasks: {len(asyncio.all_tasks())}') await task_1 # suspends execution of coroutine and gives control back to event loop while awaiting task completion. await task_2 logging.info("Main Ended") if __name__ == '__main__': asyncio.run(main()) # creats an envent loop
07:35:32:MainThread:Main started
07:35:32:MainThread:Current registered tasks: 1
07:35:32:MainThread:Creating tasks
07:35:32:MainThread:Current registered tasks: 3
07:35:32:MainThread:TWO received
07:35:32:MainThread:THREE received
07:35:34:MainThread:Printing TWO
07:35:35:MainThread:Printing THREE
07:35:35:MainThread:Main Ended
Though the program is running on a single thread, it can achieve the same level of performance as the multithreaded code by cooperative multitasking.
A better way to create AsyncIO tasks
Using asyncio.gather to create multiple tasks in one shot.
import asyncio import logging import time logger_format = '%(asctime)s:%(threadName)s:%(message)s' logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S") num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT", 9: "NINE", 10: "TEN"} async def delay_message(delay, message): logging.info(f"{message} received") await asyncio.sleep(delay) # time.sleep is blocking call. Hence, it cannot be awaited and we have to use asyncio.sleep logging.info(f"Printing {message}") async def main(): logging.info("Main started") logging.info("Creating multiple tasks with asyncio.gather") await asyncio.gather(*[delay_message(i+1, num_word_mapping[i+1]) for i in range(5)]) # awaits completion of all tasks logging.info("Main Ended") if __name__ == '__main__': asyncio.run(main()) # creats an envent loop
08:09:20:MainThread:Main started
08:09:20:MainThread:ONE received
08:09:20:MainThread:TWO received
08:09:20:MainThread:THREE received
08:09:20:MainThread:FOUR received
08:09:20:MainThread:FIVE received
08:09:21:MainThread:Printing ONE
08:09:22:MainThread:Printing TWO
08:09:23:MainThread:Printing THREE
08:09:24:MainThread:Printing FOUR
08:09:25:MainThread:Printing FIVE
08:09:25:MainThread:Main Ended
Caution about Blocking Calls in AsyncIO Tasks
As I told earlier, an asyncio task has an exclusive right to use CPU until it volunteers to give up. If by mistake a blocking call sneaks into your task, it is going to stall the progress of the program.
import asyncio import logging import time logger_format = '%(asctime)s:%(threadName)s:%(message)s' logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S") num_word_mapping = {1: 'ONE', 2: 'TWO', 3: "THREE", 4: "FOUR", 5: "FIVE", 6: "SIX", 7: "SEVEN", 8: "EIGHT", 9: "NINE", 10: "TEN"} async def delay_message(delay, message): logging.info(f"{message} received") if message != 'THREE': await asyncio.sleep(delay) # non-blocking call. gives up execution else: time.sleep(delay) # blocking call logging.info(f"Printing {message}") async def main(): logging.info("Main started") logging.info("Creating multiple tasks with asyncio.gather") await asyncio.gather(*[delay_message(i+1, num_word_mapping[i+1]) for i in range(5)]) # awaits completion of all tasks logging.info("Main Ended") if __name__ == '__main__': asyncio.run(main()) # creats an envent loop
11:07:31:MainThread:Main started
11:07:31:MainThread:Creating multiple tasks with asyncio.gather
11:07:31:MainThread:ONE received
11:07:31:MainThread:TWO received
11:07:31:MainThread:THREE received
11:07:34:MainThread:Printing THREE
11:07:34:MainThread:FOUR received
11:07:34:MainThread:FIVE received
11:07:34:MainThread:Printing ONE
11:07:34:MainThread:Printing TWO
11:07:38:MainThread:Printing FOUR
11:07:39:MainThread:Printing FIVE
11:07:39:MainThread:Main Ended
When the delay_message receives message THREE, it makes a blocking call and doesn’t give up control to the event loop until it completes the task, thus stalling the progress of execution. Hence, it takes three seconds more than the previous run. Though this example seems to be tailored, it can happen if you are not careful. On the other hand, threading is preemptive, where OS preemptively switches thread if it is waiting on a blocking call.
Race Conditions
A multithreaded code can quickly fall apart if it doesn’t account for race conditions. It especially becomes tricky when using external libraries, as we need to verify if they support multithreaded code. For example, the session
object of popular requests module is not thread-safe. Hence, trying to parallelize network requests using a session
object can produce unintended results.
import concurrent.futures as cf import logging import time logger_format = '%(asctime)s:%(threadName)s:%(message)s' logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S") class DbUpdate: def __init__(self): self.value = 0 def update(self): logging.info("Update Started") logging.info("Sleeping") time.sleep(2) # thread gets switched logging.info("Reading Value From Db") tmp = self.value**2 + 1 logging.info("Updating Value") self.value = tmp logging.info("Update Finished") db = DbUpdate() with cf.ThreadPoolExecutor(max_workers=5) as executor: updates = [executor.submit(db.update) for _ in range(2)] logging.info(f"Final value is {db.value}")
20:28:15:ThreadPoolExecutor-0_0:Update Started
20:28:15:ThreadPoolExecutor-0_0:Sleeping
20:28:15:ThreadPoolExecutor-0_1:Update Started
20:28:15:ThreadPoolExecutor-0_1:Sleeping
20:28:17:ThreadPoolExecutor-0_0:Reading Value From Db
20:28:17:ThreadPoolExecutor-0_1:Reading Value From Db
20:28:17:ThreadPoolExecutor-0_0:Updating Value
20:28:17:ThreadPoolExecutor-0_1:Updating Value
20:28:17:ThreadPoolExecutor-0_1:Update Finished
20:28:17:ThreadPoolExecutor-0_0:Update Finished
20:28:17:MainThread:Final value is 1
The final value should ideally be 2. However, due to preemptive swapping of threads, thread-0
got swapped before updating value, hence the updates
were erroneous producing final value as 1. We have to use locks to prevent this from happening.
import concurrent.futures as cf import logging import time import threading LOCK = threading.Lock() logger_format = '%(asctime)s:%(threadName)s:%(message)s' logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S") class DbUpdate: def __init__(self): self.value = 0 def update(self): logging.info("Update Started") logging.info("Sleeping") time.sleep(2) # thread gets switched with LOCK: logging.info("Reading Value From Db") tmp = self.value**2 + 1 logging.info("Updating Value") self.value = tmp logging.info("Update Finished") db = DbUpdate() with cf.ThreadPoolExecutor(max_workers=5) as executor: updates = [executor.submit(db.update) for _ in range(2)] logging.info(f"Final value is {db.value}")
21:02:45:ThreadPoolExecutor-0_0:Update Started
21:02:45:ThreadPoolExecutor-0_0:Sleeping
21:02:45:ThreadPoolExecutor-0_1:Update Started
21:02:45:ThreadPoolExecutor-0_1:Sleeping
21:02:47:ThreadPoolExecutor-0_0:Reading Value From Db
21:02:47:ThreadPoolExecutor-0_0:Updating Value
21:02:47:ThreadPoolExecutor-0_0:Update Finished
21:02:47:ThreadPoolExecutor-0_1:Reading Value From Db
21:02:47:ThreadPoolExecutor-0_1:Updating Value
21:02:47:ThreadPoolExecutor-0_1:Update Finished
21:02:47:MainThread:Final value is 2
Race Conditions are Rare with AsyncIO
Since the task has complete control on when to suspend execution, race conditions are rare with asyncio.
import asyncio import logging import time logger_format = '%(asctime)s:%(threadName)s:%(message)s' logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S") class DbUpdate: def __init__(self): self.value = 0 async def update(self): logging.info("Update Started") logging.info("Sleeping") await asyncio.sleep(1) logging.info("Reading Value From Db") tmp = self.value**2 + 1 logging.info("Updating Value") self.value = tmp logging.info("Update Finished") async def main(): db = DbUpdate() await asyncio.gather(*[db.update() for _ in range(2)]) logging.info(f"Final value is {db.value}") asyncio.run(main())
20:35:49:MainThread:Update Started
20:35:49:MainThread:Sleeping
20:35:49:MainThread:Update Started
20:35:49:MainThread:Sleeping
20:35:50:MainThread:Reading Value From Db
20:35:50:MainThread:Updating Value
20:35:50:MainThread:Update Finished
20:35:50:MainThread:Reading Value From Db
20:35:50:MainThread:Updating Value
20:35:50:MainThread:Update Finished
20:35:50:MainThread:Final value is 2
As you can see, once the task got resumed after sleeping
, it didn’t give up control until it completed the execution of coroutine. With threading, thread swapping in not very obvious, but with asyncio, we can control on when exactly the coroutine execution should be suspended. Nonetheless, it can go wrong when two coroutines enter a deadlock.
import asyncio async def foo(): await boo() async def boo(): await foo() async def main(): await asyncio.gather(*[foo(), boo()]) asyncio.run(main())
Multiprocessing
As aforementioned, multiprocessing comes really handy when implementing CPU intensive programs. Below code executes merge sorting on 1000
lists with 30000
elements. Bear with me if below implementation of merge sort is bit clumsy.
Synchronous version
import concurrent.futures as cf import logging import math import numpy as np import time import threading logger_format = '%(asctime)s:%(threadName)s:%(message)s' logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S") r_lists = [[np.random.randint(500000) for _ in range(30000)] for _ in range(1000)] def merge(l_1, l_2): out = [] key_1 = 0 key_2 = 0 for i in range(len(l_1) + len(l_2)): if l_1[key_1] < l_2[key_2]: out.append(l_1[key_1]) key_1 += 1 if key_1 == len(l_1): out = out + l_2[key_2:] break else: out.append(l_2[key_2]) key_2 += 1 if key_2 == len(l_2): out = out + l_1[key_1:] break return out def merge_sort(l): if len(l) == 1: return l mid_point = math.floor((len(l) + 1) / 2) l_1, l_2 = merge_sort(l[:mid_point]), merge_sort(l[mid_point:]) out = merge(l_1, l_2) del l_1, l_2 return out if __name__ == '__main__': logging.info("Starting Sorting") for r_list in r_lists: _ = merge_sort(r_list) logging.info("Sorting Completed")
21:24:07:MainThread:Starting Sorting
21:26:10:MainThread:Sorting Completed
Asynchronous version
import concurrent.futures as cf import logging import math import numpy as np import time import threading logger_format = '%(asctime)s:%(threadName)s:%(message)s' logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S") r_lists = [[np.random.randint(500000) for _ in range(30000)] for _ in range(1000)] def merge(l_1, l_2): out = [] key_1 = 0 key_2 = 0 for i in range(len(l_1) + len(l_2)): if l_1[key_1] < l_2[key_2]: out.append(l_1[key_1]) key_1 += 1 if key_1 == len(l_1): out = out + l_2[key_2:] break else: out.append(l_2[key_2]) key_2 += 1 if key_2 == len(l_2): out = out + l_1[key_1:] break return out def merge_sort(l): if len(l) == 1: return l mid_point = math.floor((len(l) + 1) / 2) l_1, l_2 = merge_sort(l[:mid_point]), merge_sort(l[mid_point:]) out = merge(l_1, l_2) del l_1, l_2 return out if __name__ == '__main__': logging.info("Starting Sorting") with cf.ProcessPoolExecutor() as executor: sorted_lists_futures = [executor.submit(merge_sort, r_list) for r_list in r_lists] logging.info("Sorting Completed")
21:29:33:MainThread:Starting Sorting
21:30:03:MainThread:Sorting Completed
By default, the number of processes is equal to the number of processors on the machine. You can observe a considerable improvement in execution time between two versions.
taken from
https://medium.com/analytics-vidhya/asyncio-threading-and-multiprocessing-in-python-4f5ff6ca75e8