Python asyncio Cheat Sheet

Core asyncio patterns for writing concurrent I/O-bound Python programs with async and await.

View
StandardDetailedCompact
Export
Copy the compact sheet, download it, or print it.
Download
`D` dense toggle · `C` copy all
## Getting Started
Run an async entrypoint
import asyncio

async def main():
    print("hello")
    await asyncio.sleep(1)
    print("world")

asyncio.run(main())

# Execute a top-level coroutine and manage the event loop automatically.

Define and await a coroutine
import asyncio

async def fetch_name():
    await asyncio.sleep(0.2)
    return "jon"

async def main():
    name = await fetch_name()
    print(name)

asyncio.run(main())

# Basic async function and await pattern.

Use asyncio.Runner for multiple async calls
import asyncio

async def ping(name):
    await asyncio.sleep(0.1)
    return f"pong:{name}"

with asyncio.Runner() as runner:
    print(runner.run(ping("a")))
    print(runner.run(ping("b")))

# Reuse a managed loop for multiple top-level async calls.

Get the running loop and monotonic time
import asyncio

async def main():
    loop = asyncio.get_running_loop()
    start = loop.time()
    await asyncio.sleep(0.5)
    print(loop.time() - start)

asyncio.run(main())

# Use the loop clock for scheduling and deadlines.

## Tasks and Concurrency
Create a background task
import asyncio

async def worker():
    await asyncio.sleep(1)
    print("done")

async def main():
    task = asyncio.create_task(worker())
    await task

asyncio.run(main())

# Schedule a coroutine to run concurrently.

Run tasks concurrently with gather
import asyncio

async def square(x):
    await asyncio.sleep(0.1)
    return x * x

async def main():
    results = await asyncio.gather(square(2), square(3), square(4))
    print(results)

asyncio.run(main())

# Wait for multiple coroutines and collect results.

Collect exceptions with gather
import asyncio

async def ok():
    await asyncio.sleep(0.1)
    return "ok"

async def bad():
    await asyncio.sleep(0.1)
    raise ValueError("boom")

async def main():
    results = await asyncio.gather(ok(), bad(), return_exceptions=True)
    print(results)

asyncio.run(main())

# Keep other tasks running even if one fails.

Use TaskGroup for structured concurrency
import asyncio

async def worker(name, delay):
    await asyncio.sleep(delay)
    print(name)

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(worker("a", 0.2))
        tg.create_task(worker("b", 0.1))

asyncio.run(main())

# Run related tasks under a context manager.

Wait until the first task completes
import asyncio

async def worker(name, delay):
    await asyncio.sleep(delay)
    return name

async def main():
    tasks = {asyncio.create_task(worker("fast", 0.1)), asyncio.create_task(worker("slow", 1))}
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for task in done:
        print(task.result())
    for task in pending:
        task.cancel()

asyncio.run(main())

# Use `asyncio.wait()` to control waiting behavior.

Consume results as tasks finish
import asyncio

async def worker(name, delay):
    await asyncio.sleep(delay)
    return name

async def main():
    tasks = [worker("a", 0.3), worker("b", 0.1), worker("c", 0.2)]
    for future in asyncio.as_completed(tasks):
        print(await future)

asyncio.run(main())

# Process faster tasks first using `as_completed()`.

## Cancellation and Timeouts
Cancel a task
import asyncio

async def worker():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("clean up before exit")
        raise

async def main():
    task = asyncio.create_task(worker())
    await asyncio.sleep(0.1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("task cancelled")

asyncio.run(main())

# Request cancellation and await the task to observe it.

Use `asyncio.timeout()`
import asyncio

async def main():
    try:
        async with asyncio.timeout(0.2):
            await asyncio.sleep(1)
    except TimeoutError:
        print("timed out")

asyncio.run(main())

# Apply a deadline with an async context manager.

Use `asyncio.wait_for()`
import asyncio

async def slow():
    await asyncio.sleep(2)

async def main():
    try:
        await asyncio.wait_for(slow(), timeout=0.5)
    except TimeoutError:
        print("too slow")

asyncio.run(main())

# Wrap an awaitable with a timeout.

Shield a task from outer cancellation
import asyncio

async def worker():
    await asyncio.sleep(1)
    return "finished"

async def main():
    task = asyncio.create_task(worker())
    try:
        await asyncio.wait_for(asyncio.shield(task), timeout=0.1)
    except TimeoutError:
        print("caller timed out")
    print(await task)

asyncio.run(main())

# Prevent cancellation from immediately propagating to a task.

## Queues and Producer/Consumer Patterns
Use an asyncio.Queue
import asyncio

async def producer(queue):
    for item in [1, 2, 3]:
        await queue.put(item)
    await queue.put(None)

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(item)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(producer(queue), consumer(queue))
    await queue.join()

asyncio.run(main())

# Exchange work between producers and consumers.

Add a timeout to queue operations
import asyncio

async def main():
    queue = asyncio.Queue()
    try:
        item = await asyncio.wait_for(queue.get(), timeout=1)
        print(item)
    except TimeoutError:
        print("no item available")

asyncio.run(main())

# Wrap queue operations with `wait_for()`.

Use a PriorityQueue
import asyncio

async def main():
    queue = asyncio.PriorityQueue()
    await queue.put((20, "normal"))
    await queue.put((10, "urgent"))
    print(await queue.get())
    print(await queue.get())

asyncio.run(main())

# Process lower-number priorities first.

## Synchronization Primitives
Protect shared state with a Lock
import asyncio

counter = 0
lock = asyncio.Lock()

async def increment():
    global counter
    async with lock:
        current = counter
        await asyncio.sleep(0)
        counter = current + 1

async def main():
    await asyncio.gather(*(increment() for _ in range(100)))
    print(counter)

asyncio.run(main())

# Prevent overlapping access to a critical section.

Signal readiness with an Event
import asyncio

event = asyncio.Event()

async def waiter():
    await event.wait()
    print("event received")

async def setter():
    await asyncio.sleep(0.2)
    event.set()

async def main():
    await asyncio.gather(waiter(), setter())

asyncio.run(main())

# Wake waiting tasks once a condition becomes true.

Limit concurrency with a Semaphore
import asyncio

sem = asyncio.Semaphore(3)

async def fetch(i):
    async with sem:
        print("start", i)
        await asyncio.sleep(0.2)
        print("done", i)

async def main():
    await asyncio.gather(*(fetch(i) for i in range(8)))

asyncio.run(main())

# Throttle parallel access to a finite resource.

Recommended next

No recommendations yet.