Redis is often introduced as “a cache,” but that undersells it. Here are patterns that leverage Redis for rate limiting, sessions, queues, and real-time features.

Pattern 1: Rate Limiting

The sliding window approach:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import redis
import time

r = redis.Redis()

def is_rate_limited(user_id: str, limit: int = 100, window: int = 60) -> bool:
    """Allow `limit` requests per `window` seconds."""
    key = f"ratelimit:{user_id}"
    now = time.time()
    
    pipe = r.pipeline()
    pipe.zremrangebyscore(key, 0, now - window)  # Remove old entries
    pipe.zadd(key, {str(now): now})              # Add current request
    pipe.zcard(key)                               # Count requests in window
    pipe.expire(key, window)                      # Auto-cleanup
    results = pipe.execute()
    
    request_count = results[2]
    return request_count > limit

Using a sorted set with timestamps gives you a true sliding window, not just fixed buckets.

Pattern 2: Distributed Locks

When you need exactly-once execution across multiple servers:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import uuid

def acquire_lock(lock_name: str, timeout: int = 10) -> str | None:
    """Returns lock_id if acquired, None if already locked."""
    lock_id = str(uuid.uuid4())
    acquired = r.set(
        f"lock:{lock_name}",
        lock_id,
        nx=True,      # Only set if doesn't exist
        ex=timeout    # Auto-expire to prevent deadlocks
    )
    return lock_id if acquired else None

def release_lock(lock_name: str, lock_id: str) -> bool:
    """Release lock only if we own it."""
    script = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """
    return r.eval(script, 1, f"lock:{lock_name}", lock_id) == 1

The Lua script ensures atomic check-and-delete. Without it, you risk releasing someone else’s lock.

Pattern 3: Session Storage

Better than file-based sessions:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import json
from datetime import timedelta

def save_session(session_id: str, data: dict, ttl: int = 3600):
    r.setex(f"session:{session_id}", ttl, json.dumps(data))

def get_session(session_id: str) -> dict | None:
    data = r.get(f"session:{session_id}")
    if data:
        r.expire(f"session:{session_id}", 3600)  # Extend on access
        return json.loads(data)
    return None

def destroy_session(session_id: str):
    r.delete(f"session:{session_id}")

Session data survives server restarts. Multiple app servers share state. TTL handles cleanup.

Pattern 4: Leaderboards

Sorted sets were made for this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def add_score(leaderboard: str, user_id: str, score: int):
    r.zadd(f"lb:{leaderboard}", {user_id: score})

def get_rank(leaderboard: str, user_id: str) -> int | None:
    """0-indexed rank, lowest score = rank 0."""
    rank = r.zrank(f"lb:{leaderboard}", user_id)
    return rank

def get_top_n(leaderboard: str, n: int = 10) -> list:
    """Returns [(user_id, score), ...] highest first."""
    return r.zrevrange(f"lb:{leaderboard}", 0, n-1, withscores=True)

def get_around_user(leaderboard: str, user_id: str, count: int = 5) -> list:
    """Get users around a specific user's rank."""
    rank = r.zrevrank(f"lb:{leaderboard}", user_id)
    if rank is None:
        return []
    start = max(0, rank - count)
    end = rank + count
    return r.zrevrange(f"lb:{leaderboard}", start, end, withscores=True)

All operations are O(log N). Handles millions of users.

Pattern 5: Pub/Sub for Real-Time

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# Publisher
def broadcast_event(channel: str, event: dict):
    r.publish(channel, json.dumps(event))

# Subscriber (separate process)
def listen_for_events(channel: str):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            event = json.loads(message['data'])
            handle_event(event)

Good for: chat, notifications, live updates. Not good for: reliable message delivery (use Streams instead).

Pattern 6: Simple Job Queue

Redis Lists as queues:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def enqueue_job(queue: str, job: dict):
    r.lpush(f"queue:{queue}", json.dumps(job))

def dequeue_job(queue: str, timeout: int = 0) -> dict | None:
    """Blocking pop. Returns None on timeout."""
    result = r.brpop(f"queue:{queue}", timeout=timeout)
    if result:
        return json.loads(result[1])
    return None

# Worker
while True:
    job = dequeue_job("emails")
    if job:
        send_email(job)

For production, consider Redis Streams or a proper queue (RabbitMQ, SQS).

Pattern 7: Counting Unique Items

HyperLogLog for approximate counts:

1
2
3
4
5
6
7
8
9
def track_visitor(page: str, visitor_id: str):
    r.pfadd(f"visitors:{page}", visitor_id)

def get_unique_visitors(page: str) -> int:
    return r.pfcount(f"visitors:{page}")

def get_unique_across_pages(pages: list[str]) -> int:
    keys = [f"visitors:{p}" for p in pages]
    return r.pfcount(*keys)

12KB memory per counter, regardless of cardinality. ~0.81% error rate. Perfect for analytics.

Pattern 8: Caching with Cache-Aside

The classic pattern, done right:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def get_user(user_id: str) -> dict:
    cache_key = f"user:{user_id}"
    
    # Try cache first
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Cache miss: fetch from DB
    user = db.query("SELECT * FROM users WHERE id = %s", user_id)
    
    # Populate cache with jittered TTL (prevents stampede)
    ttl = 3600 + random.randint(0, 300)
    r.setex(cache_key, ttl, json.dumps(user))
    
    return user

def invalidate_user(user_id: str):
    r.delete(f"user:{user_id}")

The random TTL jitter prevents cache stampedes when many keys expire simultaneously.

Connection Pooling

Don’t create a new connection per request:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Do this once at startup
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    max_connections=50,
    decode_responses=True
)
r = redis.Redis(connection_pool=pool)

# Use `r` everywhere

Key Naming Convention

{Euusrcloxsseaaobaeestccjmrrsehkep::ile:cl11oi:ite22nman_s33:ipvt:44atioy55b::ip::cuuceps1sse}re2ee::ot3rrg{ft:seiii1:ndln2peeeg3arns4gat5eti:ef1ier}:{attribute}

Colons are convention, not requirement. But consistency matters.

Memory Management

1
2
3
4
5
6
7
8
9
# Check memory usage
redis-cli INFO memory

# Find big keys
redis-cli --bigkeys

# Set memory limit
CONFIG SET maxmemory 2gb
CONFIG SET maxmemory-policy allkeys-lru

allkeys-lru evicts least-recently-used keys when memory is full. Good for caches.

Redis is a database that happens to be fast, not just a cache that happens to persist. Use it accordingly.