Redis is often introduced as “just a cache,” but it’s a versatile data structure server. These patterns unlock its full potential.
Connection Basics#
1
2
3
4
5
6
7
8
9
10
11
| # Connect
redis-cli -h localhost -p 6379
# With password
redis-cli -h localhost -p 6379 -a yourpassword
# Select database (0-15)
SELECT 1
# Check connectivity
PING
|
Caching Patterns#
Basic Cache with TTL#
1
2
3
4
5
6
7
8
9
10
11
| # Set with expiration (seconds)
SET user:123:profile '{"name":"Alice"}' EX 3600
# Set with expiration (milliseconds)
SET session:abc123 '{"user_id":123}' PX 86400000
# Set only if not exists
SETNX cache:key "value"
# Set only if exists (update)
SET cache:key "newvalue" XX
|
Cache-Aside Pattern#
1
2
3
4
5
6
7
8
9
10
11
12
| def get_user(user_id):
# Check cache first
cached = redis.get(f"user:{user_id}")
if cached:
return json.loads(cached)
# Cache miss - fetch from database
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
# Store in cache
redis.setex(f"user:{user_id}", 3600, json.dumps(user))
return user
|
Write-Through Pattern#
1
2
3
4
5
6
| def update_user(user_id, data):
# Update database
db.execute("UPDATE users SET ... WHERE id = %s", user_id)
# Update cache immediately
redis.setex(f"user:{user_id}", 3600, json.dumps(data))
|
Cache Stampede Prevention#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| def get_with_lock(key, fetch_func, ttl=3600, lock_ttl=10):
value = redis.get(key)
if value:
return json.loads(value)
lock_key = f"lock:{key}"
# Try to acquire lock
if redis.set(lock_key, "1", nx=True, ex=lock_ttl):
try:
value = fetch_func()
redis.setex(key, ttl, json.dumps(value))
return value
finally:
redis.delete(lock_key)
else:
# Another process is fetching, wait and retry
time.sleep(0.1)
return get_with_lock(key, fetch_func, ttl, lock_ttl)
|
Session Storage#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| import secrets
def create_session(user_id, ttl=86400):
session_id = secrets.token_urlsafe(32)
session_data = {
"user_id": user_id,
"created_at": time.time()
}
redis.setex(f"session:{session_id}", ttl, json.dumps(session_data))
return session_id
def get_session(session_id):
data = redis.get(f"session:{session_id}")
return json.loads(data) if data else None
def extend_session(session_id, ttl=86400):
redis.expire(f"session:{session_id}", ttl)
def destroy_session(session_id):
redis.delete(f"session:{session_id}")
|
Rate Limiting#
Fixed Window#
1
2
3
4
5
6
7
8
| def is_rate_limited(user_id, limit=100, window=60):
key = f"ratelimit:{user_id}:{int(time.time() // window)}"
current = redis.incr(key)
if current == 1:
redis.expire(key, window)
return current > limit
|
Sliding Window with Sorted Sets#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| def is_rate_limited_sliding(user_id, limit=100, window=60):
key = f"ratelimit:{user_id}"
now = time.time()
window_start = now - window
pipe = redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Add current request
pipe.zadd(key, {str(now): now})
# Count requests in window
pipe.zcard(key)
# Set expiration
pipe.expire(key, window)
results = pipe.execute()
request_count = results[2]
return request_count > limit
|
Token Bucket#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| def check_token_bucket(user_id, capacity=10, refill_rate=1):
key = f"bucket:{user_id}"
now = time.time()
# Get current state
data = redis.hgetall(key)
if data:
tokens = float(data[b'tokens'])
last_update = float(data[b'last_update'])
# Refill tokens based on elapsed time
elapsed = now - last_update
tokens = min(capacity, tokens + elapsed * refill_rate)
else:
tokens = capacity
if tokens >= 1:
# Consume a token
redis.hset(key, mapping={
'tokens': tokens - 1,
'last_update': now
})
redis.expire(key, int(capacity / refill_rate) + 1)
return True
return False
|
Queues and Pub/Sub#
Simple Queue with Lists#
1
2
3
4
5
6
7
8
9
10
| # Producer
def enqueue(queue_name, message):
redis.lpush(queue_name, json.dumps(message))
# Consumer (blocking)
def dequeue(queue_name, timeout=0):
result = redis.brpop(queue_name, timeout)
if result:
return json.loads(result[1])
return None
|
Reliable Queue with RPOPLPUSH#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| def reliable_dequeue(queue_name, processing_queue):
# Move item to processing queue atomically
item = redis.rpoplpush(queue_name, processing_queue)
return json.loads(item) if item else None
def ack(processing_queue, item):
# Remove from processing queue when done
redis.lrem(processing_queue, 1, json.dumps(item))
def requeue_failed(processing_queue, queue_name):
# Move failed items back to main queue
while True:
item = redis.rpoplpush(processing_queue, queue_name)
if not item:
break
|
Pub/Sub#
1
2
3
4
5
6
7
8
9
10
11
12
| # Publisher
def publish_event(channel, event):
redis.publish(channel, json.dumps(event))
# Subscriber
def subscribe(channel, callback):
pubsub = redis.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
callback(json.loads(message['data']))
|
Leaderboards with Sorted Sets#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| def add_score(leaderboard, user_id, score):
redis.zadd(leaderboard, {user_id: score})
def increment_score(leaderboard, user_id, amount):
redis.zincrby(leaderboard, amount, user_id)
def get_rank(leaderboard, user_id):
# 0-indexed, reverse order (highest first)
rank = redis.zrevrank(leaderboard, user_id)
return rank + 1 if rank is not None else None
def get_top(leaderboard, count=10):
return redis.zrevrange(leaderboard, 0, count - 1, withscores=True)
def get_around_user(leaderboard, user_id, count=5):
rank = redis.zrevrank(leaderboard, user_id)
if rank is None:
return []
start = max(0, rank - count)
end = rank + count
return redis.zrevrange(leaderboard, start, end, withscores=True)
|
Distributed Locks#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| import uuid
class RedisLock:
def __init__(self, redis_client, key, ttl=10):
self.redis = redis_client
self.key = f"lock:{key}"
self.ttl = ttl
self.token = str(uuid.uuid4())
def acquire(self, blocking=True, timeout=None):
start = time.time()
while True:
if self.redis.set(self.key, self.token, nx=True, ex=self.ttl):
return True
if not blocking:
return False
if timeout and (time.time() - start) > timeout:
return False
time.sleep(0.1)
def release(self):
# Only release if we own the lock
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis.eval(script, 1, self.key, self.token)
def __enter__(self):
self.acquire()
return self
def __exit__(self, *args):
self.release()
# Usage
with RedisLock(redis, "my-resource"):
# Critical section
do_work()
|
Counting and Analytics#
HyperLogLog for Unique Counts#
1
2
3
4
5
6
7
8
9
10
11
| # Count unique visitors (memory efficient)
def track_visitor(page, visitor_id):
redis.pfadd(f"visitors:{page}:{date.today()}", visitor_id)
def get_unique_visitors(page, date):
return redis.pfcount(f"visitors:{page}:{date}")
# Merge multiple days
def get_weekly_uniques(page):
keys = [f"visitors:{page}:{date}" for date in last_7_days()]
return redis.pfcount(*keys)
|
Bitmaps for Daily Active Users#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| def mark_active(user_id, date=None):
date = date or date.today().isoformat()
redis.setbit(f"active:{date}", user_id, 1)
def was_active(user_id, date):
return redis.getbit(f"active:{date}", user_id) == 1
def count_active(date):
return redis.bitcount(f"active:{date}")
# Users active on multiple days
def active_all_days(dates):
keys = [f"active:{d}" for d in dates]
result_key = "temp:active_intersection"
redis.bitop("AND", result_key, *keys)
count = redis.bitcount(result_key)
redis.delete(result_key)
return count
|
Expiration Strategies#
1
2
3
4
5
6
7
8
9
10
11
12
| # Set TTL
EXPIRE key 3600
EXPIREAT key 1735689600 # Unix timestamp
# Check TTL
TTL key # Returns -1 if no expiry, -2 if doesn't exist
# Remove expiration
PERSIST key
# Set value and TTL atomically
SETEX key 3600 "value"
|
Lazy Expiration Pattern#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| def get_with_soft_expire(key, ttl=3600, soft_ttl=300):
"""
Returns cached value but triggers background refresh
if within soft_ttl of expiration.
"""
pipe = redis.pipeline()
pipe.get(key)
pipe.ttl(key)
value, remaining_ttl = pipe.execute()
if value and remaining_ttl < soft_ttl:
# Trigger async refresh
refresh_cache_async.delay(key)
return value
|
Transactions and Lua Scripts#
Pipeline (Batching)#
1
2
3
4
| pipe = redis.pipeline()
for i in range(1000):
pipe.set(f"key:{i}", f"value:{i}")
pipe.execute() # Single round trip
|
Transaction with WATCH#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| def transfer(from_account, to_account, amount):
with redis.pipeline() as pipe:
while True:
try:
# Watch for changes
pipe.watch(from_account, to_account)
from_balance = int(pipe.get(from_account) or 0)
if from_balance < amount:
pipe.unwatch()
return False
# Start transaction
pipe.multi()
pipe.decrby(from_account, amount)
pipe.incrby(to_account, amount)
pipe.execute()
return True
except redis.WatchError:
# Retry if watched keys changed
continue
|
Lua Script (Atomic Operations)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| # Rate limiter as Lua script
RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return 0
else
return 1
end
"""
rate_limit = redis.register_script(RATE_LIMIT_SCRIPT)
def check_rate_limit(user_id, limit=100, window=60):
key = f"ratelimit:{user_id}:{int(time.time() // window)}"
return rate_limit(keys=[key], args=[limit, window]) == 1
|
Monitoring#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| # Real-time commands
MONITOR
# Stats
INFO
INFO memory
INFO stats
# Slow queries
SLOWLOG GET 10
# Connected clients
CLIENT LIST
# Memory usage for a key
MEMORY USAGE mykey
|
Redis excels when you match the right data structure to your problem. Lists for queues, sorted sets for leaderboards, HyperLogLog for counting uniques—each has its sweet spot.
Start simple with basic caching, then graduate to these patterns as your needs grow.
📬 Get the Newsletter
Weekly insights on DevOps, automation, and CLI mastery. No spam, unsubscribe anytime.