Python asyncio Recipes

Real-world asyncio recipes for HTTP concurrency, retries, and pipelines.

View
StandardDetailedCompact
Export
Copy the compact sheet, download it, or print it.
Download
`D` dense toggle · `C` copy all

Bounded Concurrency

Keep high throughput without overwhelming downstream services.

Bound a worker pool with Semaphore

Throttle async work to a safe level.

pythonANYsemaphoreworker-poolthroughput
python
import asyncio

sem = asyncio.Semaphore(5)

async def work(item):
    async with sem:
        await asyncio.sleep(0.2)
        return item * 10

async def main():
    results = await asyncio.gather(*(work(i) for i in range(20)))
    print(results[:5])

asyncio.run(main())
Notes

A standard pattern for APIs, crawlers, and batch processors.

Process input in batches

Break large workloads into fixed-size async chunks.

pythonANYbatchinggatherthroughput
python
import asyncio

async def process(x):
    await asyncio.sleep(0.05)
    return x * 2

async def run_batch(items, size=10):
    results = []
    for i in range(0, len(items), size):
        batch = items[i:i+size]
        results.extend(await asyncio.gather(*(process(x) for x in batch)))
    return results

print(asyncio.run(run_batch(list(range(25)))))
Notes

Batches help when you want memory control or stricter request pacing.

Retries and Resilience

Add retries, backoff, and cancellation-aware wrappers.

Retry an async operation

Retry with simple exponential backoff.

pythonANYretrybackoffresilience
python
import asyncio

attempts = 0

async def flaky():
    global attempts
    attempts += 1
    if attempts < 3:
        raise RuntimeError("temporary failure")
    return "ok"

async def with_retry():
    delay = 0.2
    for _ in range(5):
        try:
            return await flaky()
        except RuntimeError:
            await asyncio.sleep(delay)
            delay *= 2
    raise RuntimeError("exhausted retries")

print(asyncio.run(with_retry()))
Notes

Keep retries bounded and cancellation-aware in production systems.

Cancel cleanly on shutdown

Maintain and cancel outstanding tasks at shutdown time.

pythonANYshutdowncancelgraceful
python
import asyncio

async def worker():
    try:
        while True:
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("worker shutting down")
        raise

async def main():
    task = asyncio.create_task(worker())
    await asyncio.sleep(0.2)
    task.cancel()
    await asyncio.gather(task, return_exceptions=True)

asyncio.run(main())
Notes

Use this pattern to ensure graceful shutdown in services and daemons.

Debugging and Introspection

Inspect running tasks and enable asyncio debug support.

Inspect running tasks

List currently active tasks in the running loop.

pythonANYall-tasksintrospectiondebugging
python
import asyncio

async def sleeper():
    await asyncio.sleep(1)

async def main():
    task = asyncio.create_task(sleeper(), name="demo-task")
    for t in asyncio.all_tasks():
        print(t.get_name())
    await task

asyncio.run(main())
Notes

Use task introspection when debugging leaked or stuck coroutines.

Enable asyncio debug mode

Turn on extra diagnostics while developing.

pythonANYdebugdiagnosticsloop
python
import asyncio

async def main():
    loop = asyncio.get_running_loop()
    loop.set_debug(True)
    await asyncio.sleep(0.1)

asyncio.run(main(), debug=True)
Notes

Debug mode helps surface slow callbacks, resource warnings, and some misuse patterns.

Recommended next

No recommendations yet.