Redis is often introduced as “just a cache,” but it’s a versatile data structure server. These patterns unlock its full potential.
Connection Basics 1 2 3 4 5 6 7 8 9 10 11 # Connect redis-cli -h localhost -p 6379 # With password redis-cli -h localhost -p 6379 -a yourpassword # Select database (0-15) SELECT 1 # Check connectivity PING Caching Patterns Basic Cache with TTL 1 2 3 4 5 6 7 8 9 10 11 # Set with expiration (seconds) SET user:123:profile '{"name":"Alice"}' EX 3600 # Set with expiration (milliseconds) SET session:abc123 '{"user_id":123}' PX 86400000 # Set only if not exists SETNX cache:key "value" # Set only if exists (update) SET cache:key "newvalue" XX Cache-Aside Pattern 1 2 3 4 5 6 7 8 9 10 11 12 def get_user(user_id): # Check cache first cached = redis.get(f"user:{user_id}") if cached: return json.loads(cached) # Cache miss - fetch from database user = db.query("SELECT * FROM users WHERE id = %s", user_id) # Store in cache redis.setex(f"user:{user_id}", 3600, json.dumps(user)) return user Write-Through Pattern 1 2 3 4 5 6 def update_user(user_id, data): # Update database db.execute("UPDATE users SET ... WHERE id = %s", user_id) # Update cache immediately redis.setex(f"user:{user_id}", 3600, json.dumps(data)) Cache Stampede Prevention 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def get_with_lock(key, fetch_func, ttl=3600, lock_ttl=10): value = redis.get(key) if value: return json.loads(value) lock_key = f"lock:{key}" # Try to acquire lock if redis.set(lock_key, "1", nx=True, ex=lock_ttl): try: value = fetch_func() redis.setex(key, ttl, json.dumps(value)) return value finally: redis.delete(lock_key) else: # Another process is fetching, wait and retry time.sleep(0.1) return get_with_lock(key, fetch_func, ttl, lock_ttl) Session Storage 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import secrets def create_session(user_id, ttl=86400): session_id = secrets.token_urlsafe(32) session_data = { "user_id": user_id, "created_at": time.time() } redis.setex(f"session:{session_id}", ttl, json.dumps(session_data)) return session_id def get_session(session_id): data = redis.get(f"session:{session_id}") return json.loads(data) if data else None def extend_session(session_id, ttl=86400): redis.expire(f"session:{session_id}", ttl) def destroy_session(session_id): redis.delete(f"session:{session_id}") Rate Limiting Fixed Window 1 2 3 4 5 6 7 8 def is_rate_limited(user_id, limit=100, window=60): key = f"ratelimit:{user_id}:{int(time.time() // window)}" current = redis.incr(key) if current == 1: redis.expire(key, window) return current > limit Sliding Window with Sorted Sets 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def is_rate_limited_sliding(user_id, limit=100, window=60): key = f"ratelimit:{user_id}" now = time.time() window_start = now - window pipe = redis.pipeline() # Remove old entries pipe.zremrangebyscore(key, 0, window_start) # Add current request pipe.zadd(key, {str(now): now}) # Count requests in window pipe.zcard(key) # Set expiration pipe.expire(key, window) results = pipe.execute() request_count = results[2] return request_count > limit Token Bucket 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def check_token_bucket(user_id, capacity=10, refill_rate=1): key = f"bucket:{user_id}" now = time.time() # Get current state data = redis.hgetall(key) if data: tokens = float(data[b'tokens']) last_update = float(data[b'last_update']) # Refill tokens based on elapsed time elapsed = now - last_update tokens = min(capacity, tokens + elapsed * refill_rate) else: tokens = capacity if tokens >= 1: # Consume a token redis.hset(key, mapping={ 'tokens': tokens - 1, 'last_update': now }) redis.expire(key, int(capacity / refill_rate) + 1) return True return False Queues and Pub/Sub Simple Queue with Lists 1 2 3 4 5 6 7 8 9 10 # Producer def enqueue(queue_name, message): redis.lpush(queue_name, json.dumps(message)) # Consumer (blocking) def dequeue(queue_name, timeout=0): result = redis.brpop(queue_name, timeout) if result: return json.loads(result[1]) return None Reliable Queue with RPOPLPUSH 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def reliable_dequeue(queue_name, processing_queue): # Move item to processing queue atomically item = redis.rpoplpush(queue_name, processing_queue) return json.loads(item) if item else None def ack(processing_queue, item): # Remove from processing queue when done redis.lrem(processing_queue, 1, json.dumps(item)) def requeue_failed(processing_queue, queue_name): # Move failed items back to main queue while True: item = redis.rpoplpush(processing_queue, queue_name) if not item: break Pub/Sub 1 2 3 4 5 6 7 8 9 10 11 12 # Publisher def publish_event(channel, event): redis.publish(channel, json.dumps(event)) # Subscriber def subscribe(channel, callback): pubsub = redis.pubsub() pubsub.subscribe(channel) for message in pubsub.listen(): if message['type'] == 'message': callback(json.loads(message['data'])) Leaderboards with Sorted Sets 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def add_score(leaderboard, user_id, score): redis.zadd(leaderboard, {user_id: score}) def increment_score(leaderboard, user_id, amount): redis.zincrby(leaderboard, amount, user_id) def get_rank(leaderboard, user_id): # 0-indexed, reverse order (highest first) rank = redis.zrevrank(leaderboard, user_id) return rank + 1 if rank is not None else None def get_top(leaderboard, count=10): return redis.zrevrange(leaderboard, 0, count - 1, withscores=True) def get_around_user(leaderboard, user_id, count=5): rank = redis.zrevrank(leaderboard, user_id) if rank is None: return [] start = max(0, rank - count) end = rank + count return redis.zrevrange(leaderboard, start, end, withscores=True) Distributed Locks 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import uuid class RedisLock: def __init__(self, redis_client, key, ttl=10): self.redis = redis_client self.key = f"lock:{key}" self.ttl = ttl self.token = str(uuid.uuid4()) def acquire(self, blocking=True, timeout=None): start = time.time() while True: if self.redis.set(self.key, self.token, nx=True, ex=self.ttl): return True if not blocking: return False if timeout and (time.time() - start) > timeout: return False time.sleep(0.1) def release(self): # Only release if we own the lock script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ self.redis.eval(script, 1, self.key, self.token) def __enter__(self): self.acquire() return self def __exit__(self, *args): self.release() # Usage with RedisLock(redis, "my-resource"): # Critical section do_work() Counting and Analytics HyperLogLog for Unique Counts 1 2 3 4 5 6 7 8 9 10 11 # Count unique visitors (memory efficient) def track_visitor(page, visitor_id): redis.pfadd(f"visitors:{page}:{date.today()}", visitor_id) def get_unique_visitors(page, date): return redis.pfcount(f"visitors:{page}:{date}") # Merge multiple days def get_weekly_uniques(page): keys = [f"visitors:{page}:{date}" for date in last_7_days()] return redis.pfcount(*keys) Bitmaps for Daily Active Users 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def mark_active(user_id, date=None): date = date or date.today().isoformat() redis.setbit(f"active:{date}", user_id, 1) def was_active(user_id, date): return redis.getbit(f"active:{date}", user_id) == 1 def count_active(date): return redis.bitcount(f"active:{date}") # Users active on multiple days def active_all_days(dates): keys = [f"active:{d}" for d in dates] result_key = "temp:active_intersection" redis.bitop("AND", result_key, *keys) count = redis.bitcount(result_key) redis.delete(result_key) return count Expiration Strategies 1 2 3 4 5 6 7 8 9 10 11 12 # Set TTL EXPIRE key 3600 EXPIREAT key 1735689600 # Unix timestamp # Check TTL TTL key # Returns -1 if no expiry, -2 if doesn't exist # Remove expiration PERSIST key # Set value and TTL atomically SETEX key 3600 "value" Lazy Expiration Pattern 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def get_with_soft_expire(key, ttl=3600, soft_ttl=300): """ Returns cached value but triggers background refresh if within soft_ttl of expiration. """ pipe = redis.pipeline() pipe.get(key) pipe.ttl(key) value, remaining_ttl = pipe.execute() if value and remaining_ttl < soft_ttl: # Trigger async refresh refresh_cache_async.delay(key) return value Transactions and Lua Scripts Pipeline (Batching) 1 2 3 4 pipe = redis.pipeline() for i in range(1000): pipe.set(f"key:{i}", f"value:{i}") pipe.execute() # Single round trip Transaction with WATCH 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def transfer(from_account, to_account, amount): with redis.pipeline() as pipe: while True: try: # Watch for changes pipe.watch(from_account, to_account) from_balance = int(pipe.get(from_account) or 0) if from_balance < amount: pipe.unwatch() return False # Start transaction pipe.multi() pipe.decrby(from_account, amount) pipe.incrby(to_account, amount) pipe.execute() return True except redis.WatchError: # Retry if watched keys changed continue Lua Script (Atomic Operations) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # Rate limiter as Lua script RATE_LIMIT_SCRIPT = """ local key = KEYS[1] local limit = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local current = redis.call('INCR', key) if current == 1 then redis.call('EXPIRE', key, window) end if current > limit then return 0 else return 1 end """ rate_limit = redis.register_script(RATE_LIMIT_SCRIPT) def check_rate_limit(user_id, limit=100, window=60): key = f"ratelimit:{user_id}:{int(time.time() // window)}" return rate_limit(keys=[key], args=[limit, window]) == 1 Monitoring 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # Real-time commands MONITOR # Stats INFO INFO memory INFO stats # Slow queries SLOWLOG GET 10 # Connected clients CLIENT LIST # Memory usage for a key MEMORY USAGE mykey Redis excels when you match the right data structure to your problem. Lists for queues, sorted sets for leaderboards, HyperLogLog for counting uniques—each has its sweet spot.
...