There are only two hard things in computer science: cache invalidation, naming things, and off-by-one errors.
Caching is straightforward when your data never changes. Real systems aren’t that simple. Data changes, caches get stale, and suddenly your users see yesterday’s prices or last week’s profile pictures.
Here’s how to build caching that scales without becoming a source of bugs and outages.
Cache-Aside: The Default Pattern#
Most applications should start here:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| async def get_user(user_id: str) -> User:
# 1. Check cache
cached = await redis.get(f"user:{user_id}")
if cached:
return User.parse_raw(cached)
# 2. Cache miss: fetch from database
user = await db.fetch_one("SELECT * FROM users WHERE id = $1", user_id)
# 3. Populate cache
await redis.setex(
f"user:{user_id}",
3600, # 1 hour TTL
user.json()
)
return user
|
Pros: Simple, cache only contains accessed data, database is source of truth.
Cons: First request always hits database, potential thundering herd on cache miss.
Handling Updates#
When data changes, invalidate the cache:
1
2
3
4
5
6
7
8
9
10
11
| async def update_user(user_id: str, data: dict) -> User:
# Update database
user = await db.execute(
"UPDATE users SET ... WHERE id = $1 RETURNING *",
user_id, data
)
# Invalidate cache (don't update - simpler and safer)
await redis.delete(f"user:{user_id}")
return user
|
Delete rather than update. It’s simpler and avoids race conditions where the cache write happens before the database commit.
Write-Through: Consistency First#
When you need strong consistency between cache and database:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| async def update_user_write_through(user_id: str, data: dict) -> User:
async with db.transaction():
# Update database
user = await db.execute(
"UPDATE users SET ... WHERE id = $1 RETURNING *",
user_id, data
)
# Update cache in same logical transaction
await redis.setex(
f"user:{user_id}",
3600,
user.json()
)
return user
|
Pros: Cache is always current (within transaction boundaries).
Cons: Higher write latency, cache contains data that may never be read.
For high-write scenarios where eventual consistency is acceptable:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| class WriteBehindCache:
def __init__(self):
self.pending_writes = asyncio.Queue()
asyncio.create_task(self._flush_loop())
async def update(self, user_id: str, data: dict):
# Update cache immediately
await redis.setex(f"user:{user_id}", 3600, json.dumps(data))
# Queue database write
await self.pending_writes.put((user_id, data))
async def _flush_loop(self):
while True:
batch = []
# Collect pending writes
while not self.pending_writes.empty() and len(batch) < 100:
batch.append(await self.pending_writes.get())
if batch:
# Batch write to database
await self._batch_write(batch)
await asyncio.sleep(0.1) # Flush every 100ms
|
Pros: Very fast writes, batching reduces database load.
Cons: Data loss risk if cache fails before flush, complex failure handling.
The Thundering Herd Problem#
Cache expires. 1,000 requests hit simultaneously. All 1,000 go to the database. Database dies.
Solution 1: Locking#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| async def get_user_with_lock(user_id: str) -> User:
cache_key = f"user:{user_id}"
lock_key = f"lock:{cache_key}"
# Check cache
cached = await redis.get(cache_key)
if cached:
return User.parse_raw(cached)
# Try to acquire lock
acquired = await redis.set(lock_key, "1", nx=True, ex=5)
if acquired:
try:
# We got the lock - fetch and cache
user = await db.fetch_one("SELECT * FROM users WHERE id = $1", user_id)
await redis.setex(cache_key, 3600, user.json())
return user
finally:
await redis.delete(lock_key)
else:
# Someone else is fetching - wait and retry
await asyncio.sleep(0.1)
return await get_user_with_lock(user_id)
|
Solution 2: Probabilistic Early Expiration#
Refresh cache before it expires:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| import random
async def get_user_early_refresh(user_id: str) -> User:
cache_key = f"user:{user_id}"
cached = await redis.get(cache_key)
ttl = await redis.ttl(cache_key)
if cached:
# Probabilistically refresh if TTL is low
# More likely to refresh as expiration approaches
if ttl < 300 and random.random() < (300 - ttl) / 300:
asyncio.create_task(refresh_cache(user_id))
return User.parse_raw(cached)
return await fetch_and_cache(user_id)
|
As TTL drops from 300 to 0 seconds, refresh probability increases from 0% to 100%. Spreads refresh load over time.
Multi-Level Caching#
Not all caches are equal. Layer them:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| class MultiLevelCache:
def __init__(self):
self.l1 = {} # In-memory, per-instance
self.l2 = redis.Redis() # Shared Redis
async def get(self, key: str) -> str | None:
# L1: In-memory (microseconds)
if key in self.l1:
value, expires = self.l1[key]
if time.time() < expires:
return value
del self.l1[key]
# L2: Redis (milliseconds)
value = await self.l2.get(key)
if value:
# Populate L1
self.l1[key] = (value, time.time() + 60) # Short L1 TTL
return value
return None
async def set(self, key: str, value: str, ttl: int):
# Write to both levels
self.l1[key] = (value, time.time() + min(ttl, 60))
await self.l2.setex(key, ttl, value)
async def delete(self, key: str):
# Invalidate both levels
self.l1.pop(key, None)
await self.l2.delete(key)
|
L1 handles hot data with zero network latency. L2 handles warm data with shared state across instances.
Warning: L1 invalidation is tricky in multi-instance deployments. Either accept short-term inconsistency (low L1 TTL) or use pub/sub for invalidation.
Cache Key Design#
Bad keys cause collisions and make debugging painful:
1
2
3
4
5
6
7
8
9
10
11
12
| # Bad: No namespace, collision-prone
key = user_id
# Bad: No version, can't invalidate schema changes
key = f"user:{user_id}"
# Good: Namespaced, versioned, readable
key = f"myapp:v1:user:{user_id}"
# Good: Include relevant parameters
key = f"myapp:v1:user:{user_id}:profile:full"
key = f"myapp:v1:user:{user_id}:profile:summary"
|
Version your cache keys. When your User model changes, bump v1 to v2. Old cached data naturally expires.
Monitoring Your Cache#
Blind caching is dangerous caching:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| class InstrumentedCache:
def __init__(self, redis_client):
self.redis = redis_client
async def get(self, key: str) -> str | None:
start = time.time()
value = await self.redis.get(key)
duration = time.time() - start
metrics.histogram('cache.get.duration', duration)
if value:
metrics.increment('cache.hit')
else:
metrics.increment('cache.miss')
return value
|
Track:
- Hit rate: Below 80%? Cache isn’t helping much.
- Latency: P99 above 10ms? Check Redis health.
- Memory usage: Approaching limit? Evictions will spike.
- Evictions: High eviction rate? Need more memory or shorter TTLs.
When NOT to Cache#
Caching isn’t free. Skip it when:
- Data changes constantly: Cache hit rate will be terrible
- Data is unique per request: Search results, personalized content
- Consistency is critical: Financial transactions, inventory counts
- Database is fast enough: Don’t optimize what isn’t slow
The best cache is no cache. Only add caching when you have a measured performance problem.
The Checklist#
Before deploying caching:
Caching done right is invisible. Users get fast responses, databases stay healthy, and you don’t get paged at 3 AM because someone deployed a cache key typo.