Redis is often introduced as “just a cache,” but it’s a versatile data structure server. These patterns unlock its full potential.

Connection Basics

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Connect
redis-cli -h localhost -p 6379

# With password
redis-cli -h localhost -p 6379 -a yourpassword

# Select database (0-15)
SELECT 1

# Check connectivity
PING

Caching Patterns

Basic Cache with TTL

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Set with expiration (seconds)
SET user:123:profile '{"name":"Alice"}' EX 3600

# Set with expiration (milliseconds)
SET session:abc123 '{"user_id":123}' PX 86400000

# Set only if not exists
SETNX cache:key "value"

# Set only if exists (update)
SET cache:key "newvalue" XX

Cache-Aside Pattern

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def get_user(user_id):
    # Check cache first
    cached = redis.get(f"user:{user_id}")
    if cached:
        return json.loads(cached)
    
    # Cache miss - fetch from database
    user = db.query("SELECT * FROM users WHERE id = %s", user_id)
    
    # Store in cache
    redis.setex(f"user:{user_id}", 3600, json.dumps(user))
    return user

Write-Through Pattern

1
2
3
4
5
6
def update_user(user_id, data):
    # Update database
    db.execute("UPDATE users SET ... WHERE id = %s", user_id)
    
    # Update cache immediately
    redis.setex(f"user:{user_id}", 3600, json.dumps(data))

Cache Stampede Prevention

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def get_with_lock(key, fetch_func, ttl=3600, lock_ttl=10):
    value = redis.get(key)
    if value:
        return json.loads(value)
    
    lock_key = f"lock:{key}"
    
    # Try to acquire lock
    if redis.set(lock_key, "1", nx=True, ex=lock_ttl):
        try:
            value = fetch_func()
            redis.setex(key, ttl, json.dumps(value))
            return value
        finally:
            redis.delete(lock_key)
    else:
        # Another process is fetching, wait and retry
        time.sleep(0.1)
        return get_with_lock(key, fetch_func, ttl, lock_ttl)

Session Storage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import secrets

def create_session(user_id, ttl=86400):
    session_id = secrets.token_urlsafe(32)
    session_data = {
        "user_id": user_id,
        "created_at": time.time()
    }
    redis.setex(f"session:{session_id}", ttl, json.dumps(session_data))
    return session_id

def get_session(session_id):
    data = redis.get(f"session:{session_id}")
    return json.loads(data) if data else None

def extend_session(session_id, ttl=86400):
    redis.expire(f"session:{session_id}", ttl)

def destroy_session(session_id):
    redis.delete(f"session:{session_id}")

Rate Limiting

Fixed Window

1
2
3
4
5
6
7
8
def is_rate_limited(user_id, limit=100, window=60):
    key = f"ratelimit:{user_id}:{int(time.time() // window)}"
    
    current = redis.incr(key)
    if current == 1:
        redis.expire(key, window)
    
    return current > limit

Sliding Window with Sorted Sets

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def is_rate_limited_sliding(user_id, limit=100, window=60):
    key = f"ratelimit:{user_id}"
    now = time.time()
    window_start = now - window
    
    pipe = redis.pipeline()
    # Remove old entries
    pipe.zremrangebyscore(key, 0, window_start)
    # Add current request
    pipe.zadd(key, {str(now): now})
    # Count requests in window
    pipe.zcard(key)
    # Set expiration
    pipe.expire(key, window)
    
    results = pipe.execute()
    request_count = results[2]
    
    return request_count > limit

Token Bucket

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def check_token_bucket(user_id, capacity=10, refill_rate=1):
    key = f"bucket:{user_id}"
    now = time.time()
    
    # Get current state
    data = redis.hgetall(key)
    
    if data:
        tokens = float(data[b'tokens'])
        last_update = float(data[b'last_update'])
        
        # Refill tokens based on elapsed time
        elapsed = now - last_update
        tokens = min(capacity, tokens + elapsed * refill_rate)
    else:
        tokens = capacity
    
    if tokens >= 1:
        # Consume a token
        redis.hset(key, mapping={
            'tokens': tokens - 1,
            'last_update': now
        })
        redis.expire(key, int(capacity / refill_rate) + 1)
        return True
    
    return False

Queues and Pub/Sub

Simple Queue with Lists

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Producer
def enqueue(queue_name, message):
    redis.lpush(queue_name, json.dumps(message))

# Consumer (blocking)
def dequeue(queue_name, timeout=0):
    result = redis.brpop(queue_name, timeout)
    if result:
        return json.loads(result[1])
    return None

Reliable Queue with RPOPLPUSH

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def reliable_dequeue(queue_name, processing_queue):
    # Move item to processing queue atomically
    item = redis.rpoplpush(queue_name, processing_queue)
    return json.loads(item) if item else None

def ack(processing_queue, item):
    # Remove from processing queue when done
    redis.lrem(processing_queue, 1, json.dumps(item))

def requeue_failed(processing_queue, queue_name):
    # Move failed items back to main queue
    while True:
        item = redis.rpoplpush(processing_queue, queue_name)
        if not item:
            break

Pub/Sub

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Publisher
def publish_event(channel, event):
    redis.publish(channel, json.dumps(event))

# Subscriber
def subscribe(channel, callback):
    pubsub = redis.pubsub()
    pubsub.subscribe(channel)
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            callback(json.loads(message['data']))

Leaderboards with Sorted Sets

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def add_score(leaderboard, user_id, score):
    redis.zadd(leaderboard, {user_id: score})

def increment_score(leaderboard, user_id, amount):
    redis.zincrby(leaderboard, amount, user_id)

def get_rank(leaderboard, user_id):
    # 0-indexed, reverse order (highest first)
    rank = redis.zrevrank(leaderboard, user_id)
    return rank + 1 if rank is not None else None

def get_top(leaderboard, count=10):
    return redis.zrevrange(leaderboard, 0, count - 1, withscores=True)

def get_around_user(leaderboard, user_id, count=5):
    rank = redis.zrevrank(leaderboard, user_id)
    if rank is None:
        return []
    start = max(0, rank - count)
    end = rank + count
    return redis.zrevrange(leaderboard, start, end, withscores=True)

Distributed Locks

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import uuid

class RedisLock:
    def __init__(self, redis_client, key, ttl=10):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.ttl = ttl
        self.token = str(uuid.uuid4())
    
    def acquire(self, blocking=True, timeout=None):
        start = time.time()
        while True:
            if self.redis.set(self.key, self.token, nx=True, ex=self.ttl):
                return True
            if not blocking:
                return False
            if timeout and (time.time() - start) > timeout:
                return False
            time.sleep(0.1)
    
    def release(self):
        # Only release if we own the lock
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        self.redis.eval(script, 1, self.key, self.token)
    
    def __enter__(self):
        self.acquire()
        return self
    
    def __exit__(self, *args):
        self.release()

# Usage
with RedisLock(redis, "my-resource"):
    # Critical section
    do_work()

Counting and Analytics

HyperLogLog for Unique Counts

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Count unique visitors (memory efficient)
def track_visitor(page, visitor_id):
    redis.pfadd(f"visitors:{page}:{date.today()}", visitor_id)

def get_unique_visitors(page, date):
    return redis.pfcount(f"visitors:{page}:{date}")

# Merge multiple days
def get_weekly_uniques(page):
    keys = [f"visitors:{page}:{date}" for date in last_7_days()]
    return redis.pfcount(*keys)

Bitmaps for Daily Active Users

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def mark_active(user_id, date=None):
    date = date or date.today().isoformat()
    redis.setbit(f"active:{date}", user_id, 1)

def was_active(user_id, date):
    return redis.getbit(f"active:{date}", user_id) == 1

def count_active(date):
    return redis.bitcount(f"active:{date}")

# Users active on multiple days
def active_all_days(dates):
    keys = [f"active:{d}" for d in dates]
    result_key = "temp:active_intersection"
    redis.bitop("AND", result_key, *keys)
    count = redis.bitcount(result_key)
    redis.delete(result_key)
    return count

Expiration Strategies

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Set TTL
EXPIRE key 3600
EXPIREAT key 1735689600  # Unix timestamp

# Check TTL
TTL key  # Returns -1 if no expiry, -2 if doesn't exist

# Remove expiration
PERSIST key

# Set value and TTL atomically
SETEX key 3600 "value"

Lazy Expiration Pattern

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def get_with_soft_expire(key, ttl=3600, soft_ttl=300):
    """
    Returns cached value but triggers background refresh
    if within soft_ttl of expiration.
    """
    pipe = redis.pipeline()
    pipe.get(key)
    pipe.ttl(key)
    value, remaining_ttl = pipe.execute()
    
    if value and remaining_ttl < soft_ttl:
        # Trigger async refresh
        refresh_cache_async.delay(key)
    
    return value

Transactions and Lua Scripts

Pipeline (Batching)

1
2
3
4
pipe = redis.pipeline()
for i in range(1000):
    pipe.set(f"key:{i}", f"value:{i}")
pipe.execute()  # Single round trip

Transaction with WATCH

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def transfer(from_account, to_account, amount):
    with redis.pipeline() as pipe:
        while True:
            try:
                # Watch for changes
                pipe.watch(from_account, to_account)
                
                from_balance = int(pipe.get(from_account) or 0)
                if from_balance < amount:
                    pipe.unwatch()
                    return False
                
                # Start transaction
                pipe.multi()
                pipe.decrby(from_account, amount)
                pipe.incrby(to_account, amount)
                pipe.execute()
                return True
                
            except redis.WatchError:
                # Retry if watched keys changed
                continue

Lua Script (Atomic Operations)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Rate limiter as Lua script
RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])

local current = redis.call('INCR', key)
if current == 1 then
    redis.call('EXPIRE', key, window)
end

if current > limit then
    return 0
else
    return 1
end
"""

rate_limit = redis.register_script(RATE_LIMIT_SCRIPT)

def check_rate_limit(user_id, limit=100, window=60):
    key = f"ratelimit:{user_id}:{int(time.time() // window)}"
    return rate_limit(keys=[key], args=[limit, window]) == 1

Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# Real-time commands
MONITOR

# Stats
INFO
INFO memory
INFO stats

# Slow queries
SLOWLOG GET 10

# Connected clients
CLIENT LIST

# Memory usage for a key
MEMORY USAGE mykey

Redis excels when you match the right data structure to your problem. Lists for queues, sorted sets for leaderboards, HyperLogLog for counting uniques—each has its sweet spot.

Start simple with basic caching, then graduate to these patterns as your needs grow.