Python asyncio Cheat Sheet

Core asyncio patterns for writing concurrent I/O-bound Python programs with async and await.

View
StandardDetailedCompact
Export
Copy the compact sheet, download it, or print it.
Download
`D` dense toggle · `C` copy all

Getting Started

Run coroutines, sleep, and structure your entrypoint.

Run an async entrypoint

Execute a top-level coroutine and manage the event loop automatically.

pythonANYrunentrypointevent-loop
python
import asyncio

async def main():
    print("hello")
    await asyncio.sleep(1)
    print("world")

asyncio.run(main())
Notes

Use `asyncio.run()` for most top-level programs. It creates the event loop, runs the coroutine, and closes the loop.

Define and await a coroutine

Basic async function and await pattern.

pythonANYcoroutineawaitbasics
python
import asyncio

async def fetch_name():
    await asyncio.sleep(0.2)
    return "jon"

async def main():
    name = await fetch_name()
    print(name)

asyncio.run(main())
Notes

An `async def` function returns a coroutine object. Use `await` only inside another async function.

Use asyncio.Runner for multiple async calls

Reuse a managed loop for multiple top-level async calls.

pythonANYrunnerentrypointloop-management
python
import asyncio

async def ping(name):
    await asyncio.sleep(0.1)
    return f"pong:{name}"

with asyncio.Runner() as runner:
    print(runner.run(ping("a")))
    print(runner.run(ping("b")))
Notes

Useful when you need to call multiple async entrypoints in one synchronous process.

Get the running loop and monotonic time

Use the loop clock for scheduling and deadlines.

pythonANYlooptimescheduling
python
import asyncio

async def main():
    loop = asyncio.get_running_loop()
    start = loop.time()
    await asyncio.sleep(0.5)
    print(loop.time() - start)

asyncio.run(main())
Notes

Use `loop.time()` for deadlines and timeouts instead of wall-clock time.

Tasks and Concurrency

Run coroutines concurrently with tasks, gather, wait, and task groups.

Create a background task

Schedule a coroutine to run concurrently.

pythonANYcreate-tasktaskconcurrency
python
import asyncio

async def worker():
    await asyncio.sleep(1)
    print("done")

async def main():
    task = asyncio.create_task(worker())
    await task

asyncio.run(main())
Notes

A Task wraps and schedules a coroutine. Keep a reference to avoid losing track of exceptions or cancellation.

Run tasks concurrently with gather

Wait for multiple coroutines and collect results.

pythonANYgatherresultsparallel
python
import asyncio

async def square(x):
    await asyncio.sleep(0.1)
    return x * x

async def main():
    results = await asyncio.gather(square(2), square(3), square(4))
    print(results)

asyncio.run(main())
Notes

Use `gather()` to run multiple awaitables concurrently and preserve result order.

Collect exceptions with gather

Keep other tasks running even if one fails.

pythonANYgatherexceptionserror-handling
python
import asyncio

async def ok():
    await asyncio.sleep(0.1)
    return "ok"

async def bad():
    await asyncio.sleep(0.1)
    raise ValueError("boom")

async def main():
    results = await asyncio.gather(ok(), bad(), return_exceptions=True)
    print(results)

asyncio.run(main())
Notes

With `return_exceptions=True`, exceptions are returned in the result list instead of being raised immediately.

Use TaskGroup for structured concurrency

Run related tasks under a context manager.

pythonANYtaskgroupstructured-concurrencytasks
python
import asyncio

async def worker(name, delay):
    await asyncio.sleep(delay)
    print(name)

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(worker("a", 0.2))
        tg.create_task(worker("b", 0.1))

asyncio.run(main())
Notes

Task groups provide a safer way to manage related tasks. If one task fails, the group cancels the rest and re-raises.

Wait until the first task completes

Use `asyncio.wait()` to control waiting behavior.

pythonANYwaitfirst-completedrace
python
import asyncio

async def worker(name, delay):
    await asyncio.sleep(delay)
    return name

async def main():
    tasks = {asyncio.create_task(worker("fast", 0.1)), asyncio.create_task(worker("slow", 1))}
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for task in done:
        print(task.result())
    for task in pending:
        task.cancel()

asyncio.run(main())
Notes

Useful for races and cancellation patterns.

Consume results as tasks finish

Process faster tasks first using `as_completed()`.

pythonANYas-completedconcurrencyresults
python
import asyncio

async def worker(name, delay):
    await asyncio.sleep(delay)
    return name

async def main():
    tasks = [worker("a", 0.3), worker("b", 0.1), worker("c", 0.2)]
    for future in asyncio.as_completed(tasks):
        print(await future)

asyncio.run(main())
Notes

Useful when you want streaming-like processing of concurrent results.

Cancellation and Timeouts

Cancel tasks safely and apply time limits.

Cancel a task

Request cancellation and await the task to observe it.

pythonANYcancelcleanuptask
python
import asyncio

async def worker():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("clean up before exit")
        raise

async def main():
    task = asyncio.create_task(worker())
    await asyncio.sleep(0.1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("task cancelled")

asyncio.run(main())
Notes

Cancellation is cooperative. Handle `CancelledError` for cleanup, then re-raise.

Use `asyncio.timeout()`

Apply a deadline with an async context manager.

pythonANYtimeoutdeadlinecontext-manager
python
import asyncio

async def main():
    try:
        async with asyncio.timeout(0.2):
            await asyncio.sleep(1)
    except TimeoutError:
        print("timed out")

asyncio.run(main())
Notes

Modern timeout pattern for scoped async operations.

Use `asyncio.wait_for()`

Wrap an awaitable with a timeout.

pythonANYwait-fortimeoutcancel
python
import asyncio

async def slow():
    await asyncio.sleep(2)

async def main():
    try:
        await asyncio.wait_for(slow(), timeout=0.5)
    except TimeoutError:
        print("too slow")

asyncio.run(main())
Notes

A classic timeout helper that cancels the wrapped awaitable if the timeout expires.

Shield a task from outer cancellation

Prevent cancellation from immediately propagating to a task.

pythonANYshieldcancellationtimeout
python
import asyncio

async def worker():
    await asyncio.sleep(1)
    return "finished"

async def main():
    task = asyncio.create_task(worker())
    try:
        await asyncio.wait_for(asyncio.shield(task), timeout=0.1)
    except TimeoutError:
        print("caller timed out")
    print(await task)

asyncio.run(main())
Notes

Use carefully when a task must continue even if the caller times out.

Queues and Producer/Consumer Patterns

Coordinate tasks with asyncio queues.

Use an asyncio.Queue

Exchange work between producers and consumers.

pythonANYqueueproducer-consumercoordination
python
import asyncio

async def producer(queue):
    for item in [1, 2, 3]:
        await queue.put(item)
    await queue.put(None)

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(item)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(producer(queue), consumer(queue))
    await queue.join()

asyncio.run(main())
Notes

Queues are designed for async code and are not thread-safe.

Add a timeout to queue operations

Wrap queue operations with `wait_for()`.

pythonANYqueuetimeoutwait-for
python
import asyncio

async def main():
    queue = asyncio.Queue()
    try:
        item = await asyncio.wait_for(queue.get(), timeout=1)
        print(item)
    except TimeoutError:
        print("no item available")

asyncio.run(main())
Notes

Queue methods do not accept a timeout directly; wrap them with `asyncio.wait_for()`.

Use a PriorityQueue

Process lower-number priorities first.

pythonANYpriority-queueorderingqueue
python
import asyncio

async def main():
    queue = asyncio.PriorityQueue()
    await queue.put((20, "normal"))
    await queue.put((10, "urgent"))
    print(await queue.get())
    print(await queue.get())

asyncio.run(main())
Notes

Useful for schedulers, crawlers, and retry backoffs.

Synchronization Primitives

Coordinate async tasks with locks, events, semaphores, and conditions.

Protect shared state with a Lock

Prevent overlapping access to a critical section.

pythonANYlockshared-statesync
python
import asyncio

counter = 0
lock = asyncio.Lock()

async def increment():
    global counter
    async with lock:
        current = counter
        await asyncio.sleep(0)
        counter = current + 1

async def main():
    await asyncio.gather(*(increment() for _ in range(100)))
    print(counter)

asyncio.run(main())
Notes

Use `asyncio.Lock()` when multiple tasks mutate shared state.

Signal readiness with an Event

Wake waiting tasks once a condition becomes true.

pythonANYeventnotificationsync
python
import asyncio

event = asyncio.Event()

async def waiter():
    await event.wait()
    print("event received")

async def setter():
    await asyncio.sleep(0.2)
    event.set()

async def main():
    await asyncio.gather(waiter(), setter())

asyncio.run(main())
Notes

Great for one-to-many notifications.

Limit concurrency with a Semaphore

Throttle parallel access to a finite resource.

pythonANYsemaphorelimitthrottle
python
import asyncio

sem = asyncio.Semaphore(3)

async def fetch(i):
    async with sem:
        print("start", i)
        await asyncio.sleep(0.2)
        print("done", i)

async def main():
    await asyncio.gather(*(fetch(i) for i in range(8)))

asyncio.run(main())
Notes

Useful for rate-limited APIs, database pools, and bounded parallelism.

Recommended next

No recommendations yet.