Redis gets introduced as a cache, but that undersells it. It’s an in-memory data structure server with atomic operations, pub/sub, streams, and more.

These patterns show Redis’s real power.

Basic Caching (The Familiar One)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_user(user_id):
    # Check cache first
    cached = r.get(f"user:{user_id}")
    if cached:
        return json.loads(cached)
    
    # Miss: fetch from database
    user = db.query("SELECT * FROM users WHERE id = %s", user_id)
    
    # Cache with TTL
    r.setex(f"user:{user_id}", 3600, json.dumps(user))
    
    return user

Rate Limiting

Sliding window rate limiter with sorted sets:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time

def is_rate_limited(user_id, limit=100, window=60):
    key = f"ratelimit:{user_id}"
    now = time.time()
    window_start = now - window
    
    pipe = r.pipeline()
    
    # Remove old entries
    pipe.zremrangebyscore(key, 0, window_start)
    
    # Count requests in window
    pipe.zcard(key)
    
    # Add current request
    pipe.zadd(key, {str(now): now})
    
    # Set expiry
    pipe.expire(key, window)
    
    results = pipe.execute()
    request_count = results[1]
    
    return request_count >= limit

Or simpler with fixed windows:

1
2
3
4
5
6
7
8
def check_rate_limit(user_id, limit=100):
    key = f"ratelimit:{user_id}:{int(time.time() // 60)}"
    
    current = r.incr(key)
    if current == 1:
        r.expire(key, 60)
    
    return current > limit

Session Storage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import secrets

def create_session(user_id, data):
    session_id = secrets.token_urlsafe(32)
    key = f"session:{session_id}"
    
    r.hset(key, mapping={
        "user_id": user_id,
        "created_at": time.time(),
        **data
    })
    r.expire(key, 86400)  # 24 hours
    
    return session_id

def get_session(session_id):
    return r.hgetall(f"session:{session_id}")

def destroy_session(session_id):
    r.delete(f"session:{session_id}")

Distributed Locks

Prevent concurrent operations:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import uuid

def acquire_lock(resource, timeout=10):
    lock_id = str(uuid.uuid4())
    key = f"lock:{resource}"
    
    # SET NX with expiry (atomic)
    acquired = r.set(key, lock_id, nx=True, ex=timeout)
    
    return lock_id if acquired else None

def release_lock(resource, lock_id):
    key = f"lock:{resource}"
    
    # Only release if we own the lock (Lua for atomicity)
    script = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """
    r.eval(script, 1, key, lock_id)

# Usage
lock = acquire_lock("user:123:update")
if lock:
    try:
        # Do exclusive work
        update_user(123)
    finally:
        release_lock("user:123:update", lock)

Job Queue

Simple queue with lists:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def enqueue(queue_name, job):
    r.lpush(f"queue:{queue_name}", json.dumps(job))

def dequeue(queue_name, timeout=0):
    # BRPOP blocks until item available
    result = r.brpop(f"queue:{queue_name}", timeout=timeout)
    if result:
        return json.loads(result[1])
    return None

# Producer
enqueue("emails", {"to": "user@example.com", "template": "welcome"})

# Worker
while True:
    job = dequeue("emails", timeout=5)
    if job:
        send_email(job)

With reliability (job won’t be lost if worker crashes):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def reliable_dequeue(queue_name, processing_queue):
    # Move from queue to processing atomically
    job = r.brpoplpush(
        f"queue:{queue_name}",
        f"queue:{processing_queue}",
        timeout=5
    )
    return json.loads(job) if job else None

def complete_job(processing_queue, job):
    r.lrem(f"queue:{processing_queue}", 1, json.dumps(job))

Pub/Sub

Real-time messaging:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# Publisher
def publish_event(channel, event):
    r.publish(channel, json.dumps(event))

publish_event("user_updates", {"user_id": 123, "action": "login"})

# Subscriber
def subscribe(channels):
    pubsub = r.pubsub()
    pubsub.subscribe(*channels)
    
    for message in pubsub.listen():
        if message["type"] == "message":
            data = json.loads(message["data"])
            handle_event(message["channel"], data)

# Run subscriber in separate process/thread
subscribe(["user_updates", "system_alerts"])

Leaderboards

Sorted sets for rankings:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def add_score(leaderboard, user_id, score):
    r.zadd(f"leaderboard:{leaderboard}", {user_id: score})

def increment_score(leaderboard, user_id, delta):
    r.zincrby(f"leaderboard:{leaderboard}", delta, user_id)

def get_rank(leaderboard, user_id):
    # 0-indexed, reverse for highest-first
    rank = r.zrevrank(f"leaderboard:{leaderboard}", user_id)
    return rank + 1 if rank is not None else None

def get_top(leaderboard, count=10):
    return r.zrevrange(
        f"leaderboard:{leaderboard}",
        0, count - 1,
        withscores=True
    )

# Usage
add_score("weekly", "user:123", 1500)
increment_score("weekly", "user:123", 50)
print(get_rank("weekly", "user:123"))  # 3
print(get_top("weekly", 10))  # Top 10 with scores

Counting Unique Items

HyperLogLog for cardinality estimation:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def track_visitor(page, visitor_id):
    r.pfadd(f"visitors:{page}:{today()}", visitor_id)

def unique_visitors(page, date):
    return r.pfcount(f"visitors:{page}:{date}")

# Track millions of visitors with ~12KB memory per counter
track_visitor("/home", "user_abc")
track_visitor("/home", "user_xyz")
track_visitor("/home", "user_abc")  # Duplicate

print(unique_visitors("/home", "2024-01-15"))  # ~2

Geospatial

Location-based queries:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def add_location(key, name, longitude, latitude):
    r.geoadd(key, (longitude, latitude, name))

def nearby(key, longitude, latitude, radius_km):
    return r.geosearch(
        key,
        longitude=longitude,
        latitude=latitude,
        radius=radius_km,
        unit="km",
        withdist=True
    )

# Store locations
add_location("stores", "store_1", -122.4194, 37.7749)
add_location("stores", "store_2", -122.4089, 37.7851)

# Find nearby
results = nearby("stores", -122.41, 37.78, 5)
# [('store_2', 0.5), ('store_1', 1.2)]

Streams (Event Log)

Persistent, multi-consumer event log:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def add_event(stream, event):
    return r.xadd(stream, event)

def read_events(stream, last_id="0"):
    return r.xread({stream: last_id}, count=100, block=5000)

def create_consumer_group(stream, group):
    try:
        r.xgroup_create(stream, group, id="0", mkstream=True)
    except redis.ResponseError:
        pass  # Group exists

def consume(stream, group, consumer):
    return r.xreadgroup(
        group, consumer,
        {stream: ">"},
        count=10,
        block=5000
    )

def ack(stream, group, message_id):
    r.xack(stream, group, message_id)

# Usage
add_event("orders", {"user": "123", "item": "widget", "qty": "2"})

create_consumer_group("orders", "processors")
events = consume("orders", "processors", "worker-1")
for stream, messages in events:
    for msg_id, data in messages:
        process_order(data)
        ack("orders", "processors", msg_id)

Best Practices

Use pipelines for batching:

1
2
3
4
pipe = r.pipeline()
for user_id in user_ids:
    pipe.get(f"user:{user_id}")
results = pipe.execute()

Set memory limits:

#mmaarxxemmdeeimmsoo.rrcyyo-n2pfgoblicyallkeys-lru

Use appropriate data structures:

  • Strings: simple values, counters
  • Hashes: objects with fields
  • Lists: queues, recent items
  • Sets: unique collections, tags
  • Sorted Sets: rankings, time-series
  • Streams: event logs

Redis is a Swiss Army knife. Caching is just the blade everyone knows about. Rate limiting, sessions, locks, queues, pub/sub, leaderboards — they’re all built in, battle-tested, and fast.

Learn the data structures. Match them to your problems. Redis probably has a primitive that fits.