Asyncio enables concurrent I/O without threads. These patterns help you use it effectively without falling into common traps.

Basic Structure

1
2
3
4
5
6
7
8
9
import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# Python 3.7+
asyncio.run(main())

HTTP Requests with aiohttp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# Usage
urls = [
    "https://api.example.com/users",
    "https://api.example.com/posts",
    "https://api.example.com/comments",
]
results = asyncio.run(fetch_all(urls))

Task Management

Running Tasks Concurrently

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
async def task_a():
    await asyncio.sleep(2)
    return "A done"

async def task_b():
    await asyncio.sleep(1)
    return "B done"

async def main():
    # Run concurrently, wait for all
    results = await asyncio.gather(task_a(), task_b())
    print(results)  # ['A done', 'B done'] - takes ~2s total, not 3s

asyncio.run(main())

Handle Exceptions in gather

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def might_fail(n):
    if n == 2:
        raise ValueError("Task 2 failed")
    await asyncio.sleep(n)
    return f"Task {n} done"

async def main():
    # return_exceptions=True prevents one failure from canceling others
    results = await asyncio.gather(
        might_fail(1),
        might_fail(2),
        might_fail(3),
        return_exceptions=True
    )
    
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(result)

asyncio.run(main())

First Completed

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
async def main():
    tasks = [
        asyncio.create_task(fetch(session, url1)),
        asyncio.create_task(fetch(session, url2)),
    ]
    
    # Return when first completes
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    
    # Cancel remaining
    for task in pending:
        task.cancel()
    
    return done.pop().result()

Timeout

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
async def slow_operation():
    await asyncio.sleep(10)
    return "done"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=5.0)
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(main())

Semaphores (Limiting Concurrency)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
async def fetch_with_limit(session, url, semaphore):
    async with semaphore:
        async with session.get(url) as response:
            return await response.text()

async def main():
    semaphore = asyncio.Semaphore(10)  # Max 10 concurrent requests
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_limit(session, url, semaphore)
            for url in urls
        ]
        results = await asyncio.gather(*tasks)

Queues for Producer/Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def producer(queue, items):
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
    
    # Signal completion
    await queue.put(None)

async def consumer(queue, name):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        
        print(f"{name} processing: {item}")
        await asyncio.sleep(1)  # Simulate work
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)
    
    # Start producer and multiple consumers
    await asyncio.gather(
        producer(queue, range(20)),
        consumer(queue, "Worker-1"),
        consumer(queue, "Worker-2"),
    )

asyncio.run(main())

Error Handling Patterns

Task Exception Handling

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
async def risky_task():
    await asyncio.sleep(1)
    raise ValueError("Something went wrong")

async def main():
    task = asyncio.create_task(risky_task())
    
    try:
        await task
    except ValueError as e:
        print(f"Caught: {e}")

asyncio.run(main())

Background Task Exceptions

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def handle_exception(loop, context):
    msg = context.get("exception", context["message"])
    print(f"Caught exception: {msg}")

async def background_task():
    await asyncio.sleep(1)
    raise RuntimeError("Background failure")

async def main():
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)
    
    # Fire and forget - exception won't crash main
    asyncio.create_task(background_task())
    
    await asyncio.sleep(5)

asyncio.run(main())

Context Managers

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_resource():
    print("Acquiring resource")
    resource = await create_resource()
    try:
        yield resource
    finally:
        print("Releasing resource")
        await resource.close()

async def main():
    async with managed_resource() as resource:
        await resource.do_something()

Running Blocking Code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    # Simulates blocking I/O
    import time
    time.sleep(2)
    return "Done"

async def main():
    loop = asyncio.get_event_loop()
    
    # Run in thread pool
    result = await loop.run_in_executor(None, blocking_io)
    print(result)
    
    # With custom executor
    with ThreadPoolExecutor(max_workers=4) as executor:
        result = await loop.run_in_executor(executor, blocking_io)

asyncio.run(main())

Periodic Tasks

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async def periodic_task(interval, func):
    while True:
        await func()
        await asyncio.sleep(interval)

async def heartbeat():
    print("Heartbeat")

async def main():
    # Start periodic task in background
    task = asyncio.create_task(periodic_task(5, heartbeat))
    
    # Do other work
    await asyncio.sleep(20)
    
    # Cancel when done
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Periodic task cancelled")

asyncio.run(main())

Graceful Shutdown

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import signal

async def shutdown(signal, loop):
    print(f"Received {signal.name}")
    
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    
    for task in tasks:
        task.cancel()
    
    await asyncio.gather(*tasks, return_exceptions=True)
    loop.stop()

async def main():
    loop = asyncio.get_event_loop()
    
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(
            sig,
            lambda s=sig: asyncio.create_task(shutdown(s, loop))
        )
    
    # Your long-running tasks here
    await asyncio.sleep(3600)

asyncio.run(main())

Common Pitfalls

Don’t Block the Event Loop

1
2
3
4
5
6
7
8
9
# BAD - blocks entire event loop
async def bad():
    time.sleep(5)  # Blocking!
    return "done"

# GOOD - use async sleep or run_in_executor
async def good():
    await asyncio.sleep(5)
    return "done"

Don’t Forget to Await

1
2
3
4
5
6
7
# BAD - coroutine never runs
async def main():
    fetch_data()  # Missing await!

# GOOD
async def main():
    await fetch_data()

Create Tasks Properly

1
2
3
4
5
6
7
8
9
# BAD - task may be garbage collected
async def main():
    asyncio.create_task(background_work())
    # Task might not complete

# GOOD - keep reference
async def main():
    task = asyncio.create_task(background_work())
    await task  # or store in set

Don’t Mix Sync and Async

1
2
3
4
5
6
7
# BAD - calling async from sync incorrectly
def sync_function():
    result = async_function()  # Returns coroutine, not result

# GOOD - use asyncio.run or run_in_executor
def sync_function():
    result = asyncio.run(async_function())

Testing Async Code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import pytest
import asyncio

@pytest.mark.asyncio
async def test_async_function():
    result = await my_async_function()
    assert result == expected

# Or with unittest
class TestAsync(unittest.IsolatedAsyncioTestCase):
    async def test_something(self):
        result = await my_async_function()
        self.assertEqual(result, expected)

Quick Reference

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Run async function
asyncio.run(main())

# Concurrent execution
await asyncio.gather(task1(), task2())

# Create background task
task = asyncio.create_task(coro())

# Timeout
await asyncio.wait_for(coro(), timeout=5.0)

# Limit concurrency
semaphore = asyncio.Semaphore(10)
async with semaphore:
    ...

# Run blocking code
await loop.run_in_executor(None, blocking_func)

# Sleep
await asyncio.sleep(1)

Asyncio shines for I/O-bound workloads—HTTP requests, database queries, file operations. It won’t help with CPU-bound work (use multiprocessing for that).

Start simple with gather(), add semaphores when you need rate limiting, and always remember: don’t block the event loop.