Python asyncio Recipes

Real-world asyncio recipes for HTTP concurrency, retries, and pipelines.

View
StandardDetailedCompact
Export
Copy the compact sheet, download it, or print it.
Download
`D` dense toggle · `C` copy all
## Bounded Concurrency
Bound a worker pool with Semaphore
import asyncio

sem = asyncio.Semaphore(5)

async def work(item):
    async with sem:
        await asyncio.sleep(0.2)
        return item * 10

async def main():
    results = await asyncio.gather(*(work(i) for i in range(20)))
    print(results[:5])

asyncio.run(main())

# Throttle async work to a safe level.

Process input in batches
import asyncio

async def process(x):
    await asyncio.sleep(0.05)
    return x * 2

async def run_batch(items, size=10):
    results = []
    for i in range(0, len(items), size):
        batch = items[i:i+size]
        results.extend(await asyncio.gather(*(process(x) for x in batch)))
    return results

print(asyncio.run(run_batch(list(range(25)))))

# Break large workloads into fixed-size async chunks.

## Retries and Resilience
Retry an async operation
import asyncio

attempts = 0

async def flaky():
    global attempts
    attempts += 1
    if attempts < 3:
        raise RuntimeError("temporary failure")
    return "ok"

async def with_retry():
    delay = 0.2
    for _ in range(5):
        try:
            return await flaky()
        except RuntimeError:
            await asyncio.sleep(delay)
            delay *= 2
    raise RuntimeError("exhausted retries")

print(asyncio.run(with_retry()))

# Retry with simple exponential backoff.

Cancel cleanly on shutdown
import asyncio

async def worker():
    try:
        while True:
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("worker shutting down")
        raise

async def main():
    task = asyncio.create_task(worker())
    await asyncio.sleep(0.2)
    task.cancel()
    await asyncio.gather(task, return_exceptions=True)

asyncio.run(main())

# Maintain and cancel outstanding tasks at shutdown time.

## Debugging and Introspection
Inspect running tasks
import asyncio

async def sleeper():
    await asyncio.sleep(1)

async def main():
    task = asyncio.create_task(sleeper(), name="demo-task")
    for t in asyncio.all_tasks():
        print(t.get_name())
    await task

asyncio.run(main())

# List currently active tasks in the running loop.

Enable asyncio debug mode
import asyncio

async def main():
    loop = asyncio.get_running_loop()
    loop.set_debug(True)
    await asyncio.sleep(0.1)

asyncio.run(main(), debug=True)

# Turn on extra diagnostics while developing.

Recommended next

No recommendations yet.