Python Asyncio Patterns: Concurrency Without the Headaches

Asyncio enables concurrent I/O without threads. These patterns help you use it effectively without falling into common traps. Basic Structure 1 2 3 4 5 6 7 8 9 import asyncio async def main(): print("Hello") await asyncio.sleep(1) print("World") # Python 3.7+ asyncio.run(main()) HTTP Requests with aiohttp 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import aiohttp import asyncio async def fetch(session, url): async with session.get(url) as response: return await response.text() async def fetch_all(urls): async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] return await asyncio.gather(*tasks) # Usage urls = [ "https://api.example.com/users", "https://api.example.com/posts", "https://api.example.com/comments", ] results = asyncio.run(fetch_all(urls)) Task Management Running Tasks Concurrently 1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def task_a(): await asyncio.sleep(2) return "A done" async def task_b(): await asyncio.sleep(1) return "B done" async def main(): # Run concurrently, wait for all results = await asyncio.gather(task_a(), task_b()) print(results) # ['A done', 'B done'] - takes ~2s total, not 3s asyncio.run(main()) Handle Exceptions in gather 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 async def might_fail(n): if n == 2: raise ValueError("Task 2 failed") await asyncio.sleep(n) return f"Task {n} done" async def main(): # return_exceptions=True prevents one failure from canceling others results = await asyncio.gather( might_fail(1), might_fail(2), might_fail(3), return_exceptions=True ) for result in results: if isinstance(result, Exception): print(f"Error: {result}") else: print(result) asyncio.run(main()) First Completed 1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def main(): tasks = [ asyncio.create_task(fetch(session, url1)), asyncio.create_task(fetch(session, url2)), ] # Return when first completes done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) # Cancel remaining for task in pending: task.cancel() return done.pop().result() Timeout 1 2 3 4 5 6 7 8 9 10 11 async def slow_operation(): await asyncio.sleep(10) return "done" async def main(): try: result = await asyncio.wait_for(slow_operation(), timeout=5.0) except asyncio.TimeoutError: print("Operation timed out") asyncio.run(main()) Semaphores (Limiting Concurrency) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def fetch_with_limit(session, url, semaphore): async with semaphore: async with session.get(url) as response: return await response.text() async def main(): semaphore = asyncio.Semaphore(10) # Max 10 concurrent requests async with aiohttp.ClientSession() as session: tasks = [ fetch_with_limit(session, url, semaphore) for url in urls ] results = await asyncio.gather(*tasks) Queues for Producer/Consumer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 async def producer(queue, items): for item in items: await queue.put(item) print(f"Produced: {item}") # Signal completion await queue.put(None) async def consumer(queue, name): while True: item = await queue.get() if item is None: queue.task_done() break print(f"{name} processing: {item}") await asyncio.sleep(1) # Simulate work queue.task_done() async def main(): queue = asyncio.Queue(maxsize=10) # Start producer and multiple consumers await asyncio.gather( producer(queue, range(20)), consumer(queue, "Worker-1"), consumer(queue, "Worker-2"), ) asyncio.run(main()) Error Handling Patterns Task Exception Handling 1 2 3 4 5 6 7 8 9 10 11 12 13 async def risky_task(): await asyncio.sleep(1) raise ValueError("Something went wrong") async def main(): task = asyncio.create_task(risky_task()) try: await task except ValueError as e: print(f"Caught: {e}") asyncio.run(main()) Background Task Exceptions 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def handle_exception(loop, context): msg = context.get("exception", context["message"]) print(f"Caught exception: {msg}") async def background_task(): await asyncio.sleep(1) raise RuntimeError("Background failure") async def main(): loop = asyncio.get_event_loop() loop.set_exception_handler(handle_exception) # Fire and forget - exception won't crash main asyncio.create_task(background_task()) await asyncio.sleep(5) asyncio.run(main()) Context Managers 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncio from contextlib import asynccontextmanager @asynccontextmanager async def managed_resource(): print("Acquiring resource") resource = await create_resource() try: yield resource finally: print("Releasing resource") await resource.close() async def main(): async with managed_resource() as resource: await resource.do_something() Running Blocking Code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncio from concurrent.futures import ThreadPoolExecutor def blocking_io(): # Simulates blocking I/O import time time.sleep(2) return "Done" async def main(): loop = asyncio.get_event_loop() # Run in thread pool result = await loop.run_in_executor(None, blocking_io) print(result) # With custom executor with ThreadPoolExecutor(max_workers=4) as executor: result = await loop.run_in_executor(executor, blocking_io) asyncio.run(main()) Periodic Tasks 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 async def periodic_task(interval, func): while True: await func() await asyncio.sleep(interval) async def heartbeat(): print("Heartbeat") async def main(): # Start periodic task in background task = asyncio.create_task(periodic_task(5, heartbeat)) # Do other work await asyncio.sleep(20) # Cancel when done task.cancel() try: await task except asyncio.CancelledError: print("Periodic task cancelled") asyncio.run(main()) Graceful Shutdown 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import signal async def shutdown(signal, loop): print(f"Received {signal.name}") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) loop.stop() async def main(): loop = asyncio.get_event_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler( sig, lambda s=sig: asyncio.create_task(shutdown(s, loop)) ) # Your long-running tasks here await asyncio.sleep(3600) asyncio.run(main()) Common Pitfalls Don’t Block the Event Loop 1 2 3 4 5 6 7 8 9 # BAD - blocks entire event loop async def bad(): time.sleep(5) # Blocking! return "done" # GOOD - use async sleep or run_in_executor async def good(): await asyncio.sleep(5) return "done" Don’t Forget to Await 1 2 3 4 5 6 7 # BAD - coroutine never runs async def main(): fetch_data() # Missing await! # GOOD async def main(): await fetch_data() Create Tasks Properly 1 2 3 4 5 6 7 8 9 # BAD - task may be garbage collected async def main(): asyncio.create_task(background_work()) # Task might not complete # GOOD - keep reference async def main(): task = asyncio.create_task(background_work()) await task # or store in set Don’t Mix Sync and Async 1 2 3 4 5 6 7 # BAD - calling async from sync incorrectly def sync_function(): result = async_function() # Returns coroutine, not result # GOOD - use asyncio.run or run_in_executor def sync_function(): result = asyncio.run(async_function()) Testing Async Code 1 2 3 4 5 6 7 8 9 10 11 12 13 import pytest import asyncio @pytest.mark.asyncio async def test_async_function(): result = await my_async_function() assert result == expected # Or with unittest class TestAsync(unittest.IsolatedAsyncioTestCase): async def test_something(self): result = await my_async_function() self.assertEqual(result, expected) Quick Reference 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 # Run async function asyncio.run(main()) # Concurrent execution await asyncio.gather(task1(), task2()) # Create background task task = asyncio.create_task(coro()) # Timeout await asyncio.wait_for(coro(), timeout=5.0) # Limit concurrency semaphore = asyncio.Semaphore(10) async with semaphore: ... # Run blocking code await loop.run_in_executor(None, blocking_func) # Sleep await asyncio.sleep(1) Asyncio shines for I/O-bound workloads—HTTP requests, database queries, file operations. It won’t help with CPU-bound work (use multiprocessing for that). ...

February 25, 2026 Â· 6 min Â· 1194 words Â· Rob Washington