Redis is often introduced as “a cache” but it’s really a data structure server. Understanding its primitives unlocks patterns far beyond simple key-value storage.

Basic Caching

The fundamental pattern: cache expensive operations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_user(user_id: str) -> dict:
    # Check cache first
    cached = r.get(f"user:{user_id}")
    if cached:
        return json.loads(cached)
    
    # Cache miss - fetch from database
    user = db.query_user(user_id)
    
    # Store in cache with 1 hour TTL
    r.setex(f"user:{user_id}", 3600, json.dumps(user))
    
    return user

Cache-Aside Pattern

The application manages the cache explicitly:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def get_product(product_id: str) -> dict:
    cache_key = f"product:{product_id}"
    
    # 1. Check cache
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # 2. Cache miss - load from source
    product = db.get_product(product_id)
    
    # 3. Populate cache
    r.setex(cache_key, 3600, json.dumps(product))
    
    return product

def update_product(product_id: str, data: dict):
    # Update database
    db.update_product(product_id, data)
    
    # Invalidate cache
    r.delete(f"product:{product_id}")

Write-Through Pattern

Write to cache and database together:

1
2
3
4
5
6
def save_user_settings(user_id: str, settings: dict):
    cache_key = f"settings:{user_id}"
    
    # Write to both
    db.save_settings(user_id, settings)
    r.setex(cache_key, 86400, json.dumps(settings))

Cache Invalidation Strategies

Time-Based Expiration

1
2
3
4
5
# Expire after 1 hour
r.setex("key", 3600, "value")

# Expire at specific time
r.expireat("key", int(tomorrow.timestamp()))

Event-Based Invalidation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def on_user_updated(user_id: str):
    # Delete specific cache
    r.delete(f"user:{user_id}")
    
    # Delete related caches
    r.delete(f"user:{user_id}:profile")
    r.delete(f"user:{user_id}:settings")

def on_product_category_changed(category_id: str):
    # Delete all products in category (using pattern)
    keys = r.keys(f"product:category:{category_id}:*")
    if keys:
        r.delete(*keys)

Tag-Based Invalidation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def cache_with_tags(key: str, value: str, tags: list, ttl: int = 3600):
    # Store value
    r.setex(key, ttl, value)
    
    # Track key in tag sets
    for tag in tags:
        r.sadd(f"tag:{tag}", key)
        r.expire(f"tag:{tag}", ttl)

def invalidate_tag(tag: str):
    # Get all keys with this tag
    keys = r.smembers(f"tag:{tag}")
    if keys:
        r.delete(*keys)
    r.delete(f"tag:{tag}")

# Usage
cache_with_tags("product:123", data, ["category:electronics", "brand:apple"])
invalidate_tag("brand:apple")  # Clears all Apple products

Rate Limiting

Fixed Window

1
2
3
4
5
6
7
8
def is_rate_limited(user_id: str, limit: int = 100, window: int = 60) -> bool:
    key = f"ratelimit:{user_id}:{int(time.time()) // window}"
    
    current = r.incr(key)
    if current == 1:
        r.expire(key, window)
    
    return current > limit

Sliding Window

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def is_rate_limited_sliding(user_id: str, limit: int = 100, window: int = 60) -> bool:
    key = f"ratelimit:{user_id}"
    now = time.time()
    
    pipe = r.pipeline()
    
    # Remove old entries
    pipe.zremrangebyscore(key, 0, now - window)
    
    # Count current window
    pipe.zcard(key)
    
    # Add current request
    pipe.zadd(key, {str(now): now})
    
    # Set expiry
    pipe.expire(key, window)
    
    results = pipe.execute()
    count = results[1]
    
    return count >= limit

Token Bucket

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def check_token_bucket(user_id: str, tokens_per_second: float = 10, bucket_size: int = 100) -> bool:
    key = f"bucket:{user_id}"
    now = time.time()
    
    # Get current state
    data = r.hgetall(key)
    
    if not data:
        # Initialize bucket
        r.hset(key, mapping={"tokens": bucket_size - 1, "last_update": now})
        r.expire(key, 3600)
        return True
    
    tokens = float(data["tokens"])
    last_update = float(data["last_update"])
    
    # Add tokens based on time passed
    elapsed = now - last_update
    tokens = min(bucket_size, tokens + elapsed * tokens_per_second)
    
    if tokens >= 1:
        # Consume a token
        r.hset(key, mapping={"tokens": tokens - 1, "last_update": now})
        return True
    
    return False

Session Storage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import secrets

def create_session(user_id: str, ttl: int = 86400) -> str:
    session_id = secrets.token_urlsafe(32)
    
    r.hset(f"session:{session_id}", mapping={
        "user_id": user_id,
        "created_at": time.time(),
        "ip": request.remote_addr
    })
    r.expire(f"session:{session_id}", ttl)
    
    # Track user's sessions
    r.sadd(f"user_sessions:{user_id}", session_id)
    
    return session_id

def get_session(session_id: str) -> dict | None:
    data = r.hgetall(f"session:{session_id}")
    return data if data else None

def destroy_session(session_id: str):
    session = get_session(session_id)
    if session:
        r.srem(f"user_sessions:{session['user_id']}", session_id)
    r.delete(f"session:{session_id}")

def destroy_all_user_sessions(user_id: str):
    sessions = r.smembers(f"user_sessions:{user_id}")
    for session_id in sessions:
        r.delete(f"session:{session_id}")
    r.delete(f"user_sessions:{user_id}")

Distributed Locking

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import uuid

def acquire_lock(lock_name: str, timeout: int = 10) -> str | None:
    lock_id = str(uuid.uuid4())
    
    # SET NX with expiration
    if r.set(f"lock:{lock_name}", lock_id, nx=True, ex=timeout):
        return lock_id
    return None

def release_lock(lock_name: str, lock_id: str) -> bool:
    # Only release if we own the lock
    script = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """
    return r.eval(script, 1, f"lock:{lock_name}", lock_id) == 1

# Usage
lock_id = acquire_lock("process_order")
if lock_id:
    try:
        process_order()
    finally:
        release_lock("process_order", lock_id)

Pub/Sub for Real-Time

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# Publisher
def publish_event(channel: str, event: dict):
    r.publish(channel, json.dumps(event))

# Subscriber
def listen_events(channel: str):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    
    for message in pubsub.listen():
        if message["type"] == "message":
            event = json.loads(message["data"])
            handle_event(event)

# Usage
publish_event("orders", {"type": "created", "order_id": "123"})

Leaderboards with Sorted Sets

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def add_score(user_id: str, score: int):
    r.zadd("leaderboard", {user_id: score})

def get_rank(user_id: str) -> int:
    # 0-indexed, reversed (highest first)
    rank = r.zrevrank("leaderboard", user_id)
    return rank + 1 if rank is not None else None

def get_top_players(count: int = 10) -> list:
    # Returns [(user_id, score), ...]
    return r.zrevrange("leaderboard", 0, count - 1, withscores=True)

def get_nearby_players(user_id: str, count: int = 5) -> list:
    rank = r.zrevrank("leaderboard", user_id)
    if rank is None:
        return []
    
    start = max(0, rank - count)
    end = rank + count
    return r.zrevrange("leaderboard", start, end, withscores=True)

Counting Unique Items (HyperLogLog)

1
2
3
4
5
6
7
8
9
def track_unique_visitor(page: str, visitor_id: str):
    r.pfadd(f"visitors:{page}:{date.today()}", visitor_id)

def get_unique_visitors(page: str, day: date) -> int:
    return r.pfcount(f"visitors:{page}:{day}")

def get_unique_visitors_range(page: str, start: date, end: date) -> int:
    keys = [f"visitors:{page}:{d}" for d in date_range(start, end)]
    return r.pfcount(*keys)

HyperLogLog uses ~12KB regardless of cardinality, with ~0.81% error rate.

Caching Best Practices

Use Consistent Key Naming

1
2
3
4
5
6
7
# Pattern: {type}:{id}:{subtype}
"user:123"
"user:123:settings"
"user:123:sessions"
"product:456"
"product:456:reviews"
"cache:api:users:list:page:1"

Handle Cache Failures Gracefully

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def get_cached_data(key: str, fetch_func):
    try:
        cached = r.get(key)
        if cached:
            return json.loads(cached)
    except redis.RedisError:
        # Log error, continue to database
        pass
    
    # Fetch from source
    data = fetch_func()
    
    try:
        r.setex(key, 3600, json.dumps(data))
    except redis.RedisError:
        pass  # Cache write failed, that's okay
    
    return data

Prevent Cache Stampede

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def get_with_lock(key: str, fetch_func, ttl: int = 3600):
    cached = r.get(key)
    if cached:
        return json.loads(cached)
    
    # Try to acquire lock
    lock_key = f"lock:{key}"
    if r.set(lock_key, "1", nx=True, ex=30):
        try:
            data = fetch_func()
            r.setex(key, ttl, json.dumps(data))
            return data
        finally:
            r.delete(lock_key)
    else:
        # Another process is fetching, wait and retry
        time.sleep(0.1)
        return get_with_lock(key, fetch_func, ttl)

Quick Reference

PatternUse CaseData Structure
Simple cacheKey-value storageSTRING
SessionsUser session dataHASH
Rate limitingAPI throttlingSTRING/ZSET
LeaderboardsRankingsZSET
Unique countsAnalyticsHyperLogLog
Real-time eventsNotificationsPub/Sub
Distributed locksCoordinationSTRING with NX
Tag invalidationCache groupsSET

Redis is fast because it’s in-memory, but its real power comes from choosing the right data structure for your problem. A sorted set for leaderboards, HyperLogLog for unique counts, pub/sub for real-time — these patterns let you solve problems that would be complex or slow with a traditional database.