Caching seems simple. Add Redis, cache everything, go fast. Then you get stale data, cache stampedes, and bugs that only happen in production. Here’s how to cache correctly.

When to Cache

Good candidates:

  • Expensive database queries (aggregations, joins)
  • External API responses
  • Computed values that don’t change often
  • Static content (templates, configs)
  • Session data

Bad candidates:

  • Data that changes frequently
  • User-specific data with many variations
  • Security-sensitive data
  • Data where staleness causes real problems

Rule of thumb: Cache when read frequency » write frequency.

The Basic Patterns

Cache-Aside (Lazy Loading)

Application manages the cache:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def get_user(user_id: str) -> User:
    # Try cache first
    cached = redis.get(f"user:{user_id}")
    if cached:
        return User.parse(cached)
    
    # Cache miss - load from DB
    user = db.query(User).get(user_id)
    
    # Populate cache
    redis.setex(f"user:{user_id}", 3600, user.json())
    
    return user

Pros: Only caches what’s actually used
Cons: First request always slow, cache stampede risk

Write-Through

Write to cache and DB together:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def update_user(user_id: str, data: dict) -> User:
    # Update DB
    user = db.query(User).get(user_id)
    user.update(data)
    db.commit()
    
    # Update cache immediately
    redis.setex(f"user:{user_id}", 3600, user.json())
    
    return user

Pros: Cache always fresh
Cons: Write latency increases, cache may hold unused data

Write-Behind (Write-Back)

Write to cache, async write to DB:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def update_user(user_id: str, data: dict) -> User:
    # Update cache immediately
    user = get_cached_user(user_id)
    user.update(data)
    redis.setex(f"user:{user_id}", 3600, user.json())
    
    # Queue DB write
    queue.enqueue("persist_user", user_id)
    
    return user

# Background worker
def persist_user(user_id: str):
    cached = redis.get(f"user:{user_id}")
    user = User.parse(cached)
    db.merge(user)
    db.commit()

Pros: Fast writes
Cons: Data loss risk if cache dies before persist, complex

Cache Invalidation

The hard part. Two approaches:

TTL-Based (Simple)

1
2
# Cache for 1 hour, then refetch
redis.setex(f"user:{user_id}", 3600, user.json())

Pros: Simple, self-healing
Cons: Stale data during TTL window

Event-Based (Precise)

1
2
3
4
5
6
7
8
9
def update_user(user_id: str, data: dict):
    # Update DB
    db.update(user_id, data)
    
    # Invalidate cache
    redis.delete(f"user:{user_id}")
    
    # Or publish event
    redis.publish("user_updated", user_id)

Pros: Always fresh
Cons: Must track all write paths, easy to miss one

Hybrid (Practical)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# Short TTL + event invalidation
# TTL catches missed invalidations
# Events provide freshness for known writes

def get_user(user_id: str):
    cached = redis.get(f"user:{user_id}")
    if cached:
        return User.parse(cached)
    
    user = db.query(User).get(user_id)
    redis.setex(f"user:{user_id}", 300, user.json())  # 5 min TTL
    return user

def update_user(user_id: str, data: dict):
    db.update(user_id, data)
    redis.delete(f"user:{user_id}")  # Immediate invalidation

Solving Cache Stampede

When cache expires, many requests hit the DB simultaneously:

C111a222c:::h000e0001:::0e0000x000p...ci000or000ne123csu---rartRRReeeen1qqqt2uuu:eeeD0sssB0ttt:q0ABCu0:::erccciaaaecccshhheeefommmriiisssssssa,,,meqqquuudeeearrrtyyyaDDDBBB

Solution 1: Locking

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def get_user_with_lock(user_id: str):
    cached = redis.get(f"user:{user_id}")
    if cached:
        return User.parse(cached)
    
    # Try to acquire lock
    lock_key = f"lock:user:{user_id}"
    if redis.setnx(lock_key, "1"):
        redis.expire(lock_key, 10)  # Lock timeout
        try:
            user = db.query(User).get(user_id)
            redis.setex(f"user:{user_id}", 3600, user.json())
            return user
        finally:
            redis.delete(lock_key)
    else:
        # Someone else is loading, wait and retry
        time.sleep(0.1)
        return get_user_with_lock(user_id)

Solution 2: Probabilistic Early Refresh

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def get_user_with_early_refresh(user_id: str):
    cached, ttl = redis.get_with_ttl(f"user:{user_id}")
    
    if cached:
        # Probabilistically refresh before expiry
        if ttl < 300 and random.random() < 0.1:  # 10% chance in last 5 min
            refresh_async(user_id)
        return User.parse(cached)
    
    # Cache miss
    return load_and_cache_user(user_id)

Solution 3: Background Refresh

1
2
3
4
5
6
7
8
# Never let cache expire - refresh before it does

@scheduler.task(run_every=minutes(5))
def refresh_hot_users():
    hot_user_ids = get_frequently_accessed_users()
    for user_id in hot_user_ids:
        user = db.query(User).get(user_id)
        redis.setex(f"user:{user_id}", 3600, user.json())

Cache Keys

Good keys are:

  • Predictable (you can construct them without the data)
  • Namespaced (avoid collisions)
  • Versioned (for schema changes)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Bad
key = f"{user_id}"  # Collides with other entities

# Better
key = f"user:{user_id}"

# Best
key = f"v2:user:{user_id}"  # Versioned for schema changes

# For queries
key = f"v1:users:active:page:{page}:limit:{limit}"

Multi-Level Caching

RequestLo(cfaalstCeascth)eR(efdaisst)Da(tsalboaws)e
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from functools import lru_cache

# L1: In-process cache (fastest, per-instance)
@lru_cache(maxsize=1000)
def get_config(key: str) -> str:
    return get_config_from_redis(key)

# L2: Redis (shared across instances)
def get_config_from_redis(key: str) -> str:
    cached = redis.get(f"config:{key}")
    if cached:
        return cached
    
    # L3: Database
    value = db.query(Config).get(key).value
    redis.setex(f"config:{key}", 3600, value)
    return value

# Invalidate both levels
def update_config(key: str, value: str):
    db.update(key, value)
    redis.delete(f"config:{key}")
    get_config.cache_clear()  # Clear L1

Cache Warming

Pre-populate cache before traffic hits:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# On deployment
def warm_cache():
    # Load frequently accessed data
    popular_users = db.query(User).order_by(User.access_count.desc()).limit(1000)
    
    for user in popular_users:
        redis.setex(f"user:{user.id}", 3600, user.json())
    
    # Load static data
    configs = db.query(Config).all()
    for config in configs:
        redis.setex(f"config:{config.key}", 86400, config.value)

Monitoring

Key metrics:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Hit rate (should be >90%)
hit_rate = cache_hits / (cache_hits + cache_misses)

# Latency
cache_latency_p99 = measure_percentile(cache_response_times, 99)

# Memory usage
memory_used = redis.info()['used_memory']

# Evictions (sign of undersized cache)
evictions = redis.info()['evicted_keys']

Alert when:

  • Hit rate drops below 80%
  • Evictions spike
  • Latency exceeds 10ms p99

Common Mistakes

1. Caching Nulls Incorrectly

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Bad: Cache miss on every request for non-existent user
def get_user(user_id):
    cached = redis.get(f"user:{user_id}")
    if cached:
        return User.parse(cached)
    
    user = db.query(User).get(user_id)
    if user:
        redis.setex(f"user:{user_id}", 3600, user.json())
    return user  # Never caches "not found"

# Good: Cache negative results too
def get_user(user_id):
    cached = redis.get(f"user:{user_id}")
    if cached == "NULL":
        return None
    if cached:
        return User.parse(cached)
    
    user = db.query(User).get(user_id)
    if user:
        redis.setex(f"user:{user_id}", 3600, user.json())
    else:
        redis.setex(f"user:{user_id}", 300, "NULL")  # Shorter TTL for negatives
    return user

2. Forgetting Serialization Costs

1
2
3
4
5
6
7
# Bad: Serialize/deserialize huge objects every time
redis.set("big_data", pickle.dumps(huge_object))
obj = pickle.loads(redis.get("big_data"))  # CPU expensive

# Better: Store only what you need
redis.set("user:123:name", user.name)
redis.set("user:123:email", user.email)

3. No Fallback

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Bad: Cache failure = application failure
def get_user(user_id):
    return User.parse(redis.get(f"user:{user_id}"))  # Crashes if Redis down

# Good: Graceful degradation
def get_user(user_id):
    try:
        cached = redis.get(f"user:{user_id}")
        if cached:
            return User.parse(cached)
    except RedisError:
        pass  # Cache unavailable, fall through to DB
    
    return db.query(User).get(user_id)

The Caching Checklist

  • Clear invalidation strategy (TTL, events, or both)
  • Cache stampede protection
  • Versioned cache keys
  • Negative result caching
  • Graceful degradation when cache fails
  • Monitoring (hit rate, latency, evictions)
  • Cache warming for deployments

Start with cache-aside and TTL. Add complexity only when needed.


The fastest request is one that never hits your database. But the most dangerous bug is one that serves stale data. Balance accordingly.