import asyncio
async def main():
print("hello")
await asyncio.sleep(1)
print("world")
asyncio.run(main())Use `asyncio.run()` for most top-level programs. It creates the event loop, runs the coroutine, and closes the loop.
Core asyncio patterns for writing concurrent I/O-bound Python programs with async and await.
Run coroutines, sleep, and structure your entrypoint.
import asyncio
async def main():
print("hello")
await asyncio.sleep(1)
print("world")
asyncio.run(main())Use `asyncio.run()` for most top-level programs. It creates the event loop, runs the coroutine, and closes the loop.
import asyncio
async def fetch_name():
await asyncio.sleep(0.2)
return "jon"
async def main():
name = await fetch_name()
print(name)
asyncio.run(main())An `async def` function returns a coroutine object. Use `await` only inside another async function.
Reuse a managed loop for multiple top-level async calls.
import asyncio
async def ping(name):
await asyncio.sleep(0.1)
return f"pong:{name}"
with asyncio.Runner() as runner:
print(runner.run(ping("a")))
print(runner.run(ping("b")))Useful when you need to call multiple async entrypoints in one synchronous process.
import asyncio
async def main():
loop = asyncio.get_running_loop()
start = loop.time()
await asyncio.sleep(0.5)
print(loop.time() - start)
asyncio.run(main())Use `loop.time()` for deadlines and timeouts instead of wall-clock time.
Run coroutines concurrently with tasks, gather, wait, and task groups.
import asyncio
async def worker():
await asyncio.sleep(1)
print("done")
async def main():
task = asyncio.create_task(worker())
await task
asyncio.run(main())A Task wraps and schedules a coroutine. Keep a reference to avoid losing track of exceptions or cancellation.
import asyncio
async def square(x):
await asyncio.sleep(0.1)
return x * x
async def main():
results = await asyncio.gather(square(2), square(3), square(4))
print(results)
asyncio.run(main())Use `gather()` to run multiple awaitables concurrently and preserve result order.
import asyncio
async def ok():
await asyncio.sleep(0.1)
return "ok"
async def bad():
await asyncio.sleep(0.1)
raise ValueError("boom")
async def main():
results = await asyncio.gather(ok(), bad(), return_exceptions=True)
print(results)
asyncio.run(main())With `return_exceptions=True`, exceptions are returned in the result list instead of being raised immediately.
import asyncio
async def worker(name, delay):
await asyncio.sleep(delay)
print(name)
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(worker("a", 0.2))
tg.create_task(worker("b", 0.1))
asyncio.run(main())Task groups provide a safer way to manage related tasks. If one task fails, the group cancels the rest and re-raises.
import asyncio
async def worker(name, delay):
await asyncio.sleep(delay)
return name
async def main():
tasks = {asyncio.create_task(worker("fast", 0.1)), asyncio.create_task(worker("slow", 1))}
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
print(task.result())
for task in pending:
task.cancel()
asyncio.run(main())Useful for races and cancellation patterns.
import asyncio
async def worker(name, delay):
await asyncio.sleep(delay)
return name
async def main():
tasks = [worker("a", 0.3), worker("b", 0.1), worker("c", 0.2)]
for future in asyncio.as_completed(tasks):
print(await future)
asyncio.run(main())Useful when you want streaming-like processing of concurrent results.
Cancel tasks safely and apply time limits.
import asyncio
async def worker():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("clean up before exit")
raise
async def main():
task = asyncio.create_task(worker())
await asyncio.sleep(0.1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("task cancelled")
asyncio.run(main())Cancellation is cooperative. Handle `CancelledError` for cleanup, then re-raise.
import asyncio
async def main():
try:
async with asyncio.timeout(0.2):
await asyncio.sleep(1)
except TimeoutError:
print("timed out")
asyncio.run(main())Modern timeout pattern for scoped async operations.
import asyncio
async def slow():
await asyncio.sleep(2)
async def main():
try:
await asyncio.wait_for(slow(), timeout=0.5)
except TimeoutError:
print("too slow")
asyncio.run(main())A classic timeout helper that cancels the wrapped awaitable if the timeout expires.
import asyncio
async def worker():
await asyncio.sleep(1)
return "finished"
async def main():
task = asyncio.create_task(worker())
try:
await asyncio.wait_for(asyncio.shield(task), timeout=0.1)
except TimeoutError:
print("caller timed out")
print(await task)
asyncio.run(main())Use carefully when a task must continue even if the caller times out.
Coordinate tasks with asyncio queues.
import asyncio
async def producer(queue):
for item in [1, 2, 3]:
await queue.put(item)
await queue.put(None)
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(item)
queue.task_done()
async def main():
queue = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))
await queue.join()
asyncio.run(main())Queues are designed for async code and are not thread-safe.
import asyncio
async def main():
queue = asyncio.Queue()
try:
item = await asyncio.wait_for(queue.get(), timeout=1)
print(item)
except TimeoutError:
print("no item available")
asyncio.run(main())Queue methods do not accept a timeout directly; wrap them with `asyncio.wait_for()`.
import asyncio
async def main():
queue = asyncio.PriorityQueue()
await queue.put((20, "normal"))
await queue.put((10, "urgent"))
print(await queue.get())
print(await queue.get())
asyncio.run(main())Useful for schedulers, crawlers, and retry backoffs.
Coordinate async tasks with locks, events, semaphores, and conditions.
import asyncio
counter = 0
lock = asyncio.Lock()
async def increment():
global counter
async with lock:
current = counter
await asyncio.sleep(0)
counter = current + 1
async def main():
await asyncio.gather(*(increment() for _ in range(100)))
print(counter)
asyncio.run(main())Use `asyncio.Lock()` when multiple tasks mutate shared state.
import asyncio
event = asyncio.Event()
async def waiter():
await event.wait()
print("event received")
async def setter():
await asyncio.sleep(0.2)
event.set()
async def main():
await asyncio.gather(waiter(), setter())
asyncio.run(main())Great for one-to-many notifications.
import asyncio
sem = asyncio.Semaphore(3)
async def fetch(i):
async with sem:
print("start", i)
await asyncio.sleep(0.2)
print("done", i)
async def main():
await asyncio.gather(*(fetch(i) for i in range(8)))
asyncio.run(main())Useful for rate-limited APIs, database pools, and bounded parallelism.