Python asyncio Cheat Sheet
Core asyncio patterns for writing concurrent I/O-bound Python programs with async and await.
Export
Copy the compact sheet, download it, or print it.
Download
`D` dense toggle · `C` copy all
## Getting Started
## Tasks and Concurrency
import asyncio
async def ok():
await asyncio.sleep(0.1)
return "ok"
async def bad():
await asyncio.sleep(0.1)
raise ValueError("boom")
async def main():
results = await asyncio.gather(ok(), bad(), return_exceptions=True)
print(results)
asyncio.run(main())# Keep other tasks running even if one fails.
import asyncio
async def worker(name, delay):
await asyncio.sleep(delay)
print(name)
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(worker("a", 0.2))
tg.create_task(worker("b", 0.1))
asyncio.run(main())# Run related tasks under a context manager.
import asyncio
async def worker(name, delay):
await asyncio.sleep(delay)
return name
async def main():
tasks = {asyncio.create_task(worker("fast", 0.1)), asyncio.create_task(worker("slow", 1))}
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
print(task.result())
for task in pending:
task.cancel()
asyncio.run(main())# Use `asyncio.wait()` to control waiting behavior.
import asyncio
async def worker(name, delay):
await asyncio.sleep(delay)
return name
async def main():
tasks = [worker("a", 0.3), worker("b", 0.1), worker("c", 0.2)]
for future in asyncio.as_completed(tasks):
print(await future)
asyncio.run(main())# Process faster tasks first using `as_completed()`.
## Cancellation and Timeouts
import asyncio
async def worker():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("clean up before exit")
raise
async def main():
task = asyncio.create_task(worker())
await asyncio.sleep(0.1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("task cancelled")
asyncio.run(main())# Request cancellation and await the task to observe it.
import asyncio
async def worker():
await asyncio.sleep(1)
return "finished"
async def main():
task = asyncio.create_task(worker())
try:
await asyncio.wait_for(asyncio.shield(task), timeout=0.1)
except TimeoutError:
print("caller timed out")
print(await task)
asyncio.run(main())# Prevent cancellation from immediately propagating to a task.
## Queues and Producer/Consumer Patterns
import asyncio
async def producer(queue):
for item in [1, 2, 3]:
await queue.put(item)
await queue.put(None)
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(item)
queue.task_done()
async def main():
queue = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))
await queue.join()
asyncio.run(main())# Exchange work between producers and consumers.
## Synchronization Primitives
import asyncio
counter = 0
lock = asyncio.Lock()
async def increment():
global counter
async with lock:
current = counter
await asyncio.sleep(0)
counter = current + 1
async def main():
await asyncio.gather(*(increment() for _ in range(100)))
print(counter)
asyncio.run(main())# Prevent overlapping access to a critical section.
import asyncio
event = asyncio.Event()
async def waiter():
await event.wait()
print("event received")
async def setter():
await asyncio.sleep(0.2)
event.set()
async def main():
await asyncio.gather(waiter(), setter())
asyncio.run(main())# Wake waiting tasks once a condition becomes true.
import asyncio
sem = asyncio.Semaphore(3)
async def fetch(i):
async with sem:
print("start", i)
await asyncio.sleep(0.2)
print("done", i)
async def main():
await asyncio.gather(*(fetch(i) for i in range(8)))
asyncio.run(main())# Throttle parallel access to a finite resource.
More in Python asyncio
Python asyncio Advanced
Advanced loop management, low-level scheduling, and platform-aware asyncio usage.
Python asyncio Recipes
Real-world asyncio recipes for HTTP concurrency, retries, and pipelines.
Python asyncio Streams and Subprocesses
Network streams, subprocesses, and non-blocking integration patterns.