There are only two hard things in computer science: cache invalidation, naming things, and off-by-one errors.

Caching is straightforward when your data never changes. Real systems aren’t that simple. Data changes, caches get stale, and suddenly your users see yesterday’s prices or last week’s profile pictures.

Here’s how to build caching that scales without becoming a source of bugs and outages.

Cache-Aside: The Default Pattern

Most applications should start here:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
async def get_user(user_id: str) -> User:
    # 1. Check cache
    cached = await redis.get(f"user:{user_id}")
    if cached:
        return User.parse_raw(cached)
    
    # 2. Cache miss: fetch from database
    user = await db.fetch_one("SELECT * FROM users WHERE id = $1", user_id)
    
    # 3. Populate cache
    await redis.setex(
        f"user:{user_id}",
        3600,  # 1 hour TTL
        user.json()
    )
    
    return user

Pros: Simple, cache only contains accessed data, database is source of truth.

Cons: First request always hits database, potential thundering herd on cache miss.

Handling Updates

When data changes, invalidate the cache:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
async def update_user(user_id: str, data: dict) -> User:
    # Update database
    user = await db.execute(
        "UPDATE users SET ... WHERE id = $1 RETURNING *",
        user_id, data
    )
    
    # Invalidate cache (don't update - simpler and safer)
    await redis.delete(f"user:{user_id}")
    
    return user

Delete rather than update. It’s simpler and avoids race conditions where the cache write happens before the database commit.

Write-Through: Consistency First

When you need strong consistency between cache and database:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
async def update_user_write_through(user_id: str, data: dict) -> User:
    async with db.transaction():
        # Update database
        user = await db.execute(
            "UPDATE users SET ... WHERE id = $1 RETURNING *",
            user_id, data
        )
        
        # Update cache in same logical transaction
        await redis.setex(
            f"user:{user_id}",
            3600,
            user.json()
        )
    
    return user

Pros: Cache is always current (within transaction boundaries).

Cons: Higher write latency, cache contains data that may never be read.

Write-Behind: Performance First

For high-write scenarios where eventual consistency is acceptable:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class WriteBehindCache:
    def __init__(self):
        self.pending_writes = asyncio.Queue()
        asyncio.create_task(self._flush_loop())
    
    async def update(self, user_id: str, data: dict):
        # Update cache immediately
        await redis.setex(f"user:{user_id}", 3600, json.dumps(data))
        
        # Queue database write
        await self.pending_writes.put((user_id, data))
    
    async def _flush_loop(self):
        while True:
            batch = []
            # Collect pending writes
            while not self.pending_writes.empty() and len(batch) < 100:
                batch.append(await self.pending_writes.get())
            
            if batch:
                # Batch write to database
                await self._batch_write(batch)
            
            await asyncio.sleep(0.1)  # Flush every 100ms

Pros: Very fast writes, batching reduces database load.

Cons: Data loss risk if cache fails before flush, complex failure handling.

The Thundering Herd Problem

Cache expires. 1,000 requests hit simultaneously. All 1,000 go to the database. Database dies.

Solution 1: Locking

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async def get_user_with_lock(user_id: str) -> User:
    cache_key = f"user:{user_id}"
    lock_key = f"lock:{cache_key}"
    
    # Check cache
    cached = await redis.get(cache_key)
    if cached:
        return User.parse_raw(cached)
    
    # Try to acquire lock
    acquired = await redis.set(lock_key, "1", nx=True, ex=5)
    
    if acquired:
        try:
            # We got the lock - fetch and cache
            user = await db.fetch_one("SELECT * FROM users WHERE id = $1", user_id)
            await redis.setex(cache_key, 3600, user.json())
            return user
        finally:
            await redis.delete(lock_key)
    else:
        # Someone else is fetching - wait and retry
        await asyncio.sleep(0.1)
        return await get_user_with_lock(user_id)

Solution 2: Probabilistic Early Expiration

Refresh cache before it expires:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import random

async def get_user_early_refresh(user_id: str) -> User:
    cache_key = f"user:{user_id}"
    
    cached = await redis.get(cache_key)
    ttl = await redis.ttl(cache_key)
    
    if cached:
        # Probabilistically refresh if TTL is low
        # More likely to refresh as expiration approaches
        if ttl < 300 and random.random() < (300 - ttl) / 300:
            asyncio.create_task(refresh_cache(user_id))
        
        return User.parse_raw(cached)
    
    return await fetch_and_cache(user_id)

As TTL drops from 300 to 0 seconds, refresh probability increases from 0% to 100%. Spreads refresh load over time.

Multi-Level Caching

Not all caches are equal. Layer them:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class MultiLevelCache:
    def __init__(self):
        self.l1 = {}  # In-memory, per-instance
        self.l2 = redis.Redis()  # Shared Redis
    
    async def get(self, key: str) -> str | None:
        # L1: In-memory (microseconds)
        if key in self.l1:
            value, expires = self.l1[key]
            if time.time() < expires:
                return value
            del self.l1[key]
        
        # L2: Redis (milliseconds)
        value = await self.l2.get(key)
        if value:
            # Populate L1
            self.l1[key] = (value, time.time() + 60)  # Short L1 TTL
            return value
        
        return None
    
    async def set(self, key: str, value: str, ttl: int):
        # Write to both levels
        self.l1[key] = (value, time.time() + min(ttl, 60))
        await self.l2.setex(key, ttl, value)
    
    async def delete(self, key: str):
        # Invalidate both levels
        self.l1.pop(key, None)
        await self.l2.delete(key)

L1 handles hot data with zero network latency. L2 handles warm data with shared state across instances.

Warning: L1 invalidation is tricky in multi-instance deployments. Either accept short-term inconsistency (low L1 TTL) or use pub/sub for invalidation.

Cache Key Design

Bad keys cause collisions and make debugging painful:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Bad: No namespace, collision-prone
key = user_id

# Bad: No version, can't invalidate schema changes
key = f"user:{user_id}"

# Good: Namespaced, versioned, readable
key = f"myapp:v1:user:{user_id}"

# Good: Include relevant parameters
key = f"myapp:v1:user:{user_id}:profile:full"
key = f"myapp:v1:user:{user_id}:profile:summary"

Version your cache keys. When your User model changes, bump v1 to v2. Old cached data naturally expires.

Monitoring Your Cache

Blind caching is dangerous caching:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class InstrumentedCache:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    async def get(self, key: str) -> str | None:
        start = time.time()
        value = await self.redis.get(key)
        duration = time.time() - start
        
        metrics.histogram('cache.get.duration', duration)
        
        if value:
            metrics.increment('cache.hit')
        else:
            metrics.increment('cache.miss')
        
        return value

Track:

  • Hit rate: Below 80%? Cache isn’t helping much.
  • Latency: P99 above 10ms? Check Redis health.
  • Memory usage: Approaching limit? Evictions will spike.
  • Evictions: High eviction rate? Need more memory or shorter TTLs.

When NOT to Cache

Caching isn’t free. Skip it when:

  • Data changes constantly: Cache hit rate will be terrible
  • Data is unique per request: Search results, personalized content
  • Consistency is critical: Financial transactions, inventory counts
  • Database is fast enough: Don’t optimize what isn’t slow

The best cache is no cache. Only add caching when you have a measured performance problem.

The Checklist

Before deploying caching:

  • Pattern chosen (cache-aside for most cases)
  • TTLs set appropriately (balance freshness vs hit rate)
  • Invalidation strategy defined (delete on write)
  • Thundering herd mitigated (locking or early refresh)
  • Keys are namespaced and versioned
  • Monitoring in place (hit rate, latency, memory)
  • Failure mode handled (fallback to database)
  • Cache warming considered for cold starts

Caching done right is invisible. Users get fast responses, databases stay healthy, and you don’t get paged at 3 AM because someone deployed a cache key typo.