There are only two hard things in computer science: cache invalidation and naming things.

Here’s how to get the first one right (most of the time).

Why Cache?

Caching trades memory for speed. Instead of computing or fetching something every time, you store the result and reuse it.

Good candidates for caching:

  • Database queries that don’t change often
  • API responses from external services
  • Computed values that are expensive to generate
  • Static assets (images, CSS, JS)

Bad candidates:

  • Data that changes constantly
  • User-specific data that varies per request
  • Security-sensitive information

Cache Layers

UserECdDgNeApRpeldiicsa/tMieomncaCcahcehdeDQautearbyasCeacChaecheDatabase

Each layer catches requests before they hit the next, more expensive layer.

In-Memory Caching (Application Level)

Python with functools

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from functools import lru_cache
import time

@lru_cache(maxsize=128)
def get_user_permissions(user_id: int) -> list[str]:
    """Expensive database query, cached in memory."""
    time.sleep(0.1)  # Simulate slow query
    return ["read", "write", "admin"]

# First call: slow
permissions = get_user_permissions(123)  # 100ms

# Second call: instant (from cache)
permissions = get_user_permissions(123)  # <1ms

# Clear cache when needed
get_user_permissions.cache_clear()

Time-based expiration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from cachetools import TTLCache
from threading import Lock

cache = TTLCache(maxsize=1000, ttl=300)  # 5 minute TTL
cache_lock = Lock()

def get_user_profile(user_id: int) -> dict:
    cache_key = f"user:{user_id}"
    
    with cache_lock:
        if cache_key in cache:
            return cache[cache_key]
    
    # Cache miss - fetch from database
    profile = db.query(f"SELECT * FROM users WHERE id = {user_id}")
    
    with cache_lock:
        cache[cache_key] = profile
    
    return profile

Redis Caching (Distributed)

Redis is the go-to for distributed caching — fast, persistent, and feature-rich.

Basic Operations

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

def cache_get(key: str) -> dict | None:
    """Get value from cache."""
    data = r.get(key)
    return json.loads(data) if data else None

def cache_set(key: str, value: dict, ttl: int = 300):
    """Set value with expiration."""
    r.setex(key, ttl, json.dumps(value))

def cache_delete(key: str):
    """Invalidate cache entry."""
    r.delete(key)

# Usage
user = cache_get("user:123")
if not user:
    user = fetch_user_from_db(123)
    cache_set("user:123", user, ttl=600)

Cache-Aside Pattern

The most common pattern — application manages the cache explicitly.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def get_product(product_id: int) -> dict:
    cache_key = f"product:{product_id}"
    
    # 1. Check cache
    cached = cache_get(cache_key)
    if cached:
        return cached
    
    # 2. Cache miss - fetch from source
    product = db.get_product(product_id)
    
    # 3. Populate cache
    if product:
        cache_set(cache_key, product, ttl=3600)
    
    return product

def update_product(product_id: int, data: dict):
    # 1. Update database
    db.update_product(product_id, data)
    
    # 2. Invalidate cache
    cache_delete(f"product:{product_id}")

Write-Through Pattern

Writes go to cache AND database together.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def save_user(user_id: int, data: dict):
    cache_key = f"user:{user_id}"
    
    # Write to both simultaneously
    db.save_user(user_id, data)
    cache_set(cache_key, data, ttl=3600)

def get_user(user_id: int) -> dict:
    cache_key = f"user:{user_id}"
    
    cached = cache_get(cache_key)
    if cached:
        return cached
    
    user = db.get_user(user_id)
    cache_set(cache_key, user, ttl=3600)
    return user

Cache Invalidation Strategies

Time-Based (TTL)

Simplest approach — data expires after a set time.

1
2
3
4
5
# Short TTL for frequently changing data
cache_set("stock_price:AAPL", price, ttl=60)  # 1 minute

# Long TTL for stable data
cache_set("country_list", countries, ttl=86400)  # 24 hours

Pros: Simple, automatic cleanup Cons: Stale data until expiration

Event-Based Invalidation

Invalidate when the underlying data changes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def update_user(user_id: int, data: dict):
    db.update_user(user_id, data)
    
    # Invalidate all related caches
    cache_delete(f"user:{user_id}")
    cache_delete(f"user:{user_id}:profile")
    cache_delete(f"user:{user_id}:permissions")
    
    # Publish event for other services
    redis.publish("user_updated", json.dumps({"user_id": user_id}))

Tag-Based Invalidation

Group related cache entries for bulk invalidation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def cache_set_with_tags(key: str, value: dict, tags: list[str], ttl: int = 300):
    """Set cache with tags for group invalidation."""
    r.setex(key, ttl, json.dumps(value))
    
    for tag in tags:
        r.sadd(f"tag:{tag}", key)
        r.expire(f"tag:{tag}", ttl)

def invalidate_by_tag(tag: str):
    """Invalidate all cache entries with a tag."""
    keys = r.smembers(f"tag:{tag}")
    if keys:
        r.delete(*keys)
    r.delete(f"tag:{tag}")

# Usage
cache_set_with_tags(
    "product:123",
    product_data,
    tags=["products", "category:electronics"],
    ttl=3600
)

# Invalidate all products
invalidate_by_tag("products")

HTTP Caching

Cache-Control Headers

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from flask import Flask, make_response

app = Flask(__name__)

@app.route("/api/products/<int:id>")
def get_product(id):
    product = fetch_product(id)
    response = make_response(jsonify(product))
    
    # Cache for 1 hour, allow CDN caching
    response.headers["Cache-Control"] = "public, max-age=3600"
    
    # ETag for conditional requests
    response.headers["ETag"] = f'"{hash(str(product))}"'
    
    return response

@app.route("/api/user/profile")
def get_profile():
    profile = fetch_user_profile()
    response = make_response(jsonify(profile))
    
    # Private - don't cache on CDN
    response.headers["Cache-Control"] = "private, max-age=300"
    
    return response

CDN Configuration (CloudFront)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Terraform
resource "aws_cloudfront_distribution" "cdn" {
  origin {
    domain_name = aws_s3_bucket.static.bucket_regional_domain_name
    origin_id   = "S3Origin"
  }

  default_cache_behavior {
    allowed_methods  = ["GET", "HEAD"]
    cached_methods   = ["GET", "HEAD"]
    target_origin_id = "S3Origin"

    forwarded_values {
      query_string = false
      cookies {
        forward = "none"
      }
    }

    viewer_protocol_policy = "redirect-to-https"
    min_ttl                = 0
    default_ttl            = 3600   # 1 hour
    max_ttl                = 86400  # 24 hours
  }
}

Common Pitfalls

1. Cache Stampede

When cache expires, many requests hit the database simultaneously.

Solution: Locking

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading

locks = {}

def get_with_lock(key: str, fetch_func):
    cached = cache_get(key)
    if cached:
        return cached
    
    # Get or create lock for this key
    if key not in locks:
        locks[key] = threading.Lock()
    
    with locks[key]:
        # Double-check after acquiring lock
        cached = cache_get(key)
        if cached:
            return cached
        
        # Only one thread fetches
        value = fetch_func()
        cache_set(key, value)
        return value

2. Stale Data

Cache shows outdated information.

Solution: Shorter TTL + Background refresh

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import threading

def get_with_background_refresh(key: str, fetch_func, ttl: int = 300):
    cached = cache_get(key)
    remaining_ttl = r.ttl(key)
    
    if cached:
        # Refresh in background if TTL < 20%
        if remaining_ttl < ttl * 0.2:
            threading.Thread(
                target=lambda: cache_set(key, fetch_func(), ttl)
            ).start()
        return cached
    
    value = fetch_func()
    cache_set(key, value, ttl)
    return value

3. Memory Exhaustion

Cache grows unbounded.

Solution: Set maxmemory in Redis

1
2
3
# redis.conf
maxmemory 256mb
maxmemory-policy allkeys-lru

When NOT to Cache

  • Write-heavy workloads — Cache invalidation overhead outweighs benefits
  • Highly personalized data — Low cache hit rate
  • Real-time requirements — Stale data is unacceptable
  • Small datasets — Database is fast enough

The Decision Framework

IstheNYoedsataDoDenox'eptsenctsahYNiceeovhseedattLoaSohnfcogehretatrcnhgTT/eTTcLLof,mropersuqituemeevp?neltneltry-?biansveadliidnavtailoindation

Start simple. Measure. Cache only what’s slow. Invalidate aggressively.