Asyncio enables concurrent I/O without threads. These patterns help you use it effectively without falling into common traps.
Basic Structure#
1
2
3
4
5
6
7
8
9
| import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# Python 3.7+
asyncio.run(main())
|
HTTP Requests with aiohttp#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
# Usage
urls = [
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments",
]
results = asyncio.run(fetch_all(urls))
|
Task Management#
Running Tasks Concurrently#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| async def task_a():
await asyncio.sleep(2)
return "A done"
async def task_b():
await asyncio.sleep(1)
return "B done"
async def main():
# Run concurrently, wait for all
results = await asyncio.gather(task_a(), task_b())
print(results) # ['A done', 'B done'] - takes ~2s total, not 3s
asyncio.run(main())
|
Handle Exceptions in gather#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| async def might_fail(n):
if n == 2:
raise ValueError("Task 2 failed")
await asyncio.sleep(n)
return f"Task {n} done"
async def main():
# return_exceptions=True prevents one failure from canceling others
results = await asyncio.gather(
might_fail(1),
might_fail(2),
might_fail(3),
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(result)
asyncio.run(main())
|
First Completed#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| async def main():
tasks = [
asyncio.create_task(fetch(session, url1)),
asyncio.create_task(fetch(session, url2)),
]
# Return when first completes
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Cancel remaining
for task in pending:
task.cancel()
return done.pop().result()
|
Timeout#
1
2
3
4
5
6
7
8
9
10
11
| async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(main())
|
Semaphores (Limiting Concurrency)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| async def fetch_with_limit(session, url, semaphore):
async with semaphore:
async with session.get(url) as response:
return await response.text()
async def main():
semaphore = asyncio.Semaphore(10) # Max 10 concurrent requests
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(session, url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
|
Queues for Producer/Consumer#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| async def producer(queue, items):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
# Signal completion
await queue.put(None)
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"{name} processing: {item}")
await asyncio.sleep(1) # Simulate work
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# Start producer and multiple consumers
await asyncio.gather(
producer(queue, range(20)),
consumer(queue, "Worker-1"),
consumer(queue, "Worker-2"),
)
asyncio.run(main())
|
Error Handling Patterns#
Task Exception Handling#
1
2
3
4
5
6
7
8
9
10
11
12
13
| async def risky_task():
await asyncio.sleep(1)
raise ValueError("Something went wrong")
async def main():
task = asyncio.create_task(risky_task())
try:
await task
except ValueError as e:
print(f"Caught: {e}")
asyncio.run(main())
|
Background Task Exceptions#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| def handle_exception(loop, context):
msg = context.get("exception", context["message"])
print(f"Caught exception: {msg}")
async def background_task():
await asyncio.sleep(1)
raise RuntimeError("Background failure")
async def main():
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
# Fire and forget - exception won't crash main
asyncio.create_task(background_task())
await asyncio.sleep(5)
asyncio.run(main())
|
Context Managers#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_resource():
print("Acquiring resource")
resource = await create_resource()
try:
yield resource
finally:
print("Releasing resource")
await resource.close()
async def main():
async with managed_resource() as resource:
await resource.do_something()
|
Running Blocking Code#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
# Simulates blocking I/O
import time
time.sleep(2)
return "Done"
async def main():
loop = asyncio.get_event_loop()
# Run in thread pool
result = await loop.run_in_executor(None, blocking_io)
print(result)
# With custom executor
with ThreadPoolExecutor(max_workers=4) as executor:
result = await loop.run_in_executor(executor, blocking_io)
asyncio.run(main())
|
Periodic Tasks#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| async def periodic_task(interval, func):
while True:
await func()
await asyncio.sleep(interval)
async def heartbeat():
print("Heartbeat")
async def main():
# Start periodic task in background
task = asyncio.create_task(periodic_task(5, heartbeat))
# Do other work
await asyncio.sleep(20)
# Cancel when done
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Periodic task cancelled")
asyncio.run(main())
|
Graceful Shutdown#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| import signal
async def shutdown(signal, loop):
print(f"Received {signal.name}")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
async def main():
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda s=sig: asyncio.create_task(shutdown(s, loop))
)
# Your long-running tasks here
await asyncio.sleep(3600)
asyncio.run(main())
|
Common Pitfalls#
Don’t Block the Event Loop#
1
2
3
4
5
6
7
8
9
| # BAD - blocks entire event loop
async def bad():
time.sleep(5) # Blocking!
return "done"
# GOOD - use async sleep or run_in_executor
async def good():
await asyncio.sleep(5)
return "done"
|
Don’t Forget to Await#
1
2
3
4
5
6
7
| # BAD - coroutine never runs
async def main():
fetch_data() # Missing await!
# GOOD
async def main():
await fetch_data()
|
Create Tasks Properly#
1
2
3
4
5
6
7
8
9
| # BAD - task may be garbage collected
async def main():
asyncio.create_task(background_work())
# Task might not complete
# GOOD - keep reference
async def main():
task = asyncio.create_task(background_work())
await task # or store in set
|
Don’t Mix Sync and Async#
1
2
3
4
5
6
7
| # BAD - calling async from sync incorrectly
def sync_function():
result = async_function() # Returns coroutine, not result
# GOOD - use asyncio.run or run_in_executor
def sync_function():
result = asyncio.run(async_function())
|
Testing Async Code#
1
2
3
4
5
6
7
8
9
10
11
12
13
| import pytest
import asyncio
@pytest.mark.asyncio
async def test_async_function():
result = await my_async_function()
assert result == expected
# Or with unittest
class TestAsync(unittest.IsolatedAsyncioTestCase):
async def test_something(self):
result = await my_async_function()
self.assertEqual(result, expected)
|
Quick Reference#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| # Run async function
asyncio.run(main())
# Concurrent execution
await asyncio.gather(task1(), task2())
# Create background task
task = asyncio.create_task(coro())
# Timeout
await asyncio.wait_for(coro(), timeout=5.0)
# Limit concurrency
semaphore = asyncio.Semaphore(10)
async with semaphore:
...
# Run blocking code
await loop.run_in_executor(None, blocking_func)
# Sleep
await asyncio.sleep(1)
|
Asyncio shines for I/O-bound workloads—HTTP requests, database queries, file operations. It won’t help with CPU-bound work (use multiprocessing for that).
Start simple with gather(), add semaphores when you need rate limiting, and always remember: don’t block the event loop.
📬 Get the Newsletter
Weekly insights on DevOps, automation, and CLI mastery. No spam, unsubscribe anytime.