Async Python lets you handle thousands of concurrent I/O operations with a single thread. No threads, no processes, no GIL headaches. But it requires thinking differently about how code executes.
These patterns help you write async code that’s both correct and efficient.
The Basics#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| import asyncio
async def fetch_data(url: str) -> dict:
# This is a coroutine - it can be paused and resumed
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# Running coroutines
async def main():
data = await fetch_data("https://api.example.com/data")
print(data)
asyncio.run(main())
|
await pauses the coroutine until the result is ready, letting other coroutines run.
Concurrent Execution#
gather: Run Multiple Coroutines#
1
2
3
4
5
6
7
8
| async def main():
# These run concurrently, not sequentially
results = await asyncio.gather(
fetch_data("https://api.example.com/users"),
fetch_data("https://api.example.com/posts"),
fetch_data("https://api.example.com/comments"),
)
users, posts, comments = results
|
TaskGroup: Better Error Handling (Python 3.11+)#
1
2
3
4
5
6
7
8
| async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data(url1))
task2 = tg.create_task(fetch_data(url2))
task3 = tg.create_task(fetch_data(url3))
# All tasks complete or all are cancelled on error
return task1.result(), task2.result(), task3.result()
|
as_completed: Process Results as They Arrive#
1
2
3
4
5
6
| async def main():
tasks = [fetch_data(url) for url in urls]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Got result: {result}") # Process as each completes
|
Timeouts#
1
2
3
4
5
6
7
8
9
10
11
12
13
| async def fetch_with_timeout(url: str, timeout: float = 5.0):
try:
async with asyncio.timeout(timeout):
return await fetch_data(url)
except asyncio.TimeoutError:
return None
# Or with gather
results = await asyncio.gather(
asyncio.wait_for(fetch_data(url1), timeout=5.0),
asyncio.wait_for(fetch_data(url2), timeout=5.0),
return_exceptions=True # Don't fail all if one times out
)
|
Semaphores: Limit Concurrency#
1
2
3
4
5
6
7
8
9
10
11
| async def fetch_with_limit(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(url: str):
async with semaphore: # Only N coroutines run at once
return await fetch_data(url)
return await asyncio.gather(*[fetch_one(url) for url in urls])
# Fetch 1000 URLs, but only 10 at a time
results = await fetch_with_limit(urls, max_concurrent=10)
|
Queues: Producer/Consumer#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item)
# Signal completion
await queue.put(None)
async def consumer(queue: asyncio.Queue, worker_id: int):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
result = await process(item)
print(f"Worker {worker_id} processed {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=100)
# Start workers
workers = [
asyncio.create_task(consumer(queue, i))
for i in range(5)
]
# Produce items
await producer(queue, items)
# Wait for all items to be processed
await queue.join()
# Stop workers
for _ in workers:
await queue.put(None)
await asyncio.gather(*workers)
|
Async Context Managers#
1
2
3
4
5
6
7
8
9
10
11
12
| class AsyncDatabasePool:
async def __aenter__(self):
self.pool = await asyncpg.create_pool(DATABASE_URL)
return self.pool
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.pool.close()
async def main():
async with AsyncDatabasePool() as pool:
async with pool.acquire() as conn:
result = await conn.fetch("SELECT * FROM users")
|
Async Generators#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| async def fetch_pages(base_url: str):
page = 1
while True:
data = await fetch_data(f"{base_url}?page={page}")
if not data["items"]:
break
for item in data["items"]:
yield item
page += 1
async def main():
async for item in fetch_pages("https://api.example.com/items"):
print(item)
|
Error Handling#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| async def fetch_with_retry(url: str, retries: int = 3):
for attempt in range(retries):
try:
return await fetch_data(url)
except aiohttp.ClientError as e:
if attempt == retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
async def fetch_many_with_errors(urls: list[str]):
results = await asyncio.gather(
*[fetch_data(url) for url in urls],
return_exceptions=True
)
successes = []
failures = []
for url, result in zip(urls, results):
if isinstance(result, Exception):
failures.append((url, result))
else:
successes.append(result)
return successes, failures
|
Mixing Sync and Async#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| import asyncio
from concurrent.futures import ThreadPoolExecutor
# Run sync code in a thread pool
async def run_sync_in_thread(sync_func, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, sync_func, *args)
# CPU-bound work
async def process_data(data):
# Run CPU-intensive work in thread pool
result = await run_sync_in_thread(expensive_computation, data)
return result
# Blocking I/O that doesn't have async version
async def read_file_async(path: str):
return await run_sync_in_thread(lambda: open(path).read())
|
Common Mistakes#
Forgetting to await#
1
2
3
4
5
6
7
| # ❌ Bad: Creates coroutine but doesn't run it
async def main():
fetch_data(url) # This does nothing!
# ✅ Good
async def main():
await fetch_data(url)
|
Blocking the event loop#
1
2
3
4
5
6
7
8
9
10
11
| # ❌ Bad: Blocks entire event loop
async def main():
time.sleep(5) # Nothing else runs during this
data = requests.get(url) # Blocking HTTP client
# ✅ Good
async def main():
await asyncio.sleep(5)
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
|
Creating too many tasks#
1
2
3
4
5
6
7
8
9
10
11
| # ❌ Bad: 10,000 concurrent connections
tasks = [fetch_data(url) for url in ten_thousand_urls]
await asyncio.gather(*tasks)
# ✅ Good: Limit concurrency
semaphore = asyncio.Semaphore(100)
async def limited_fetch(url):
async with semaphore:
return await fetch_data(url)
await asyncio.gather(*[limited_fetch(url) for url in ten_thousand_urls])
|
FastAPI Integration#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/aggregate")
async def aggregate_data():
# Concurrent API calls
users, posts = await asyncio.gather(
fetch_data("https://api.example.com/users"),
fetch_data("https://api.example.com/posts"),
)
return {"users": users, "posts": posts}
@app.get("/stream")
async def stream_data():
async def generate():
async for item in fetch_pages("/items"):
yield f"data: {item}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
|
Testing Async Code#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| import pytest
@pytest.mark.asyncio
async def test_fetch_data():
result = await fetch_data("https://api.example.com/test")
assert result["status"] == "ok"
@pytest.mark.asyncio
async def test_concurrent_fetches():
results = await asyncio.gather(
fetch_data(url1),
fetch_data(url2),
)
assert len(results) == 2
|
Async Python shines for I/O-bound workloads: HTTP requests, database queries, file operations. It won’t help with CPU-bound work — use multiprocessing for that.
Start simple: async/await for sequential async code. Add gather for concurrent operations. Use semaphores when you need to limit concurrency. The patterns compound — once you internalize them, async code becomes natural.
📬 Get the Newsletter
Weekly insights on DevOps, automation, and CLI mastery. No spam, unsubscribe anytime.