Redis is often introduced as “a cache” but it’s really a data structure server. Understanding its primitives unlocks patterns far beyond simple key-value storage.
Basic Caching#
The fundamental pattern: cache expensive operations.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_user(user_id: str) -> dict:
# Check cache first
cached = r.get(f"user:{user_id}")
if cached:
return json.loads(cached)
# Cache miss - fetch from database
user = db.query_user(user_id)
# Store in cache with 1 hour TTL
r.setex(f"user:{user_id}", 3600, json.dumps(user))
return user
|
Cache-Aside Pattern#
The application manages the cache explicitly:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| def get_product(product_id: str) -> dict:
cache_key = f"product:{product_id}"
# 1. Check cache
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 2. Cache miss - load from source
product = db.get_product(product_id)
# 3. Populate cache
r.setex(cache_key, 3600, json.dumps(product))
return product
def update_product(product_id: str, data: dict):
# Update database
db.update_product(product_id, data)
# Invalidate cache
r.delete(f"product:{product_id}")
|
Write-Through Pattern#
Write to cache and database together:
1
2
3
4
5
6
| def save_user_settings(user_id: str, settings: dict):
cache_key = f"settings:{user_id}"
# Write to both
db.save_settings(user_id, settings)
r.setex(cache_key, 86400, json.dumps(settings))
|
Cache Invalidation Strategies#
Time-Based Expiration#
1
2
3
4
5
| # Expire after 1 hour
r.setex("key", 3600, "value")
# Expire at specific time
r.expireat("key", int(tomorrow.timestamp()))
|
Event-Based Invalidation#
1
2
3
4
5
6
7
8
9
10
11
12
13
| def on_user_updated(user_id: str):
# Delete specific cache
r.delete(f"user:{user_id}")
# Delete related caches
r.delete(f"user:{user_id}:profile")
r.delete(f"user:{user_id}:settings")
def on_product_category_changed(category_id: str):
# Delete all products in category (using pattern)
keys = r.keys(f"product:category:{category_id}:*")
if keys:
r.delete(*keys)
|
Tag-Based Invalidation#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| def cache_with_tags(key: str, value: str, tags: list, ttl: int = 3600):
# Store value
r.setex(key, ttl, value)
# Track key in tag sets
for tag in tags:
r.sadd(f"tag:{tag}", key)
r.expire(f"tag:{tag}", ttl)
def invalidate_tag(tag: str):
# Get all keys with this tag
keys = r.smembers(f"tag:{tag}")
if keys:
r.delete(*keys)
r.delete(f"tag:{tag}")
# Usage
cache_with_tags("product:123", data, ["category:electronics", "brand:apple"])
invalidate_tag("brand:apple") # Clears all Apple products
|
Rate Limiting#
Fixed Window#
1
2
3
4
5
6
7
8
| def is_rate_limited(user_id: str, limit: int = 100, window: int = 60) -> bool:
key = f"ratelimit:{user_id}:{int(time.time()) // window}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current > limit
|
Sliding Window#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| def is_rate_limited_sliding(user_id: str, limit: int = 100, window: int = 60) -> bool:
key = f"ratelimit:{user_id}"
now = time.time()
pipe = r.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, now - window)
# Count current window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, window)
results = pipe.execute()
count = results[1]
return count >= limit
|
Token Bucket#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| def check_token_bucket(user_id: str, tokens_per_second: float = 10, bucket_size: int = 100) -> bool:
key = f"bucket:{user_id}"
now = time.time()
# Get current state
data = r.hgetall(key)
if not data:
# Initialize bucket
r.hset(key, mapping={"tokens": bucket_size - 1, "last_update": now})
r.expire(key, 3600)
return True
tokens = float(data["tokens"])
last_update = float(data["last_update"])
# Add tokens based on time passed
elapsed = now - last_update
tokens = min(bucket_size, tokens + elapsed * tokens_per_second)
if tokens >= 1:
# Consume a token
r.hset(key, mapping={"tokens": tokens - 1, "last_update": now})
return True
return False
|
Session Storage#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| import secrets
def create_session(user_id: str, ttl: int = 86400) -> str:
session_id = secrets.token_urlsafe(32)
r.hset(f"session:{session_id}", mapping={
"user_id": user_id,
"created_at": time.time(),
"ip": request.remote_addr
})
r.expire(f"session:{session_id}", ttl)
# Track user's sessions
r.sadd(f"user_sessions:{user_id}", session_id)
return session_id
def get_session(session_id: str) -> dict | None:
data = r.hgetall(f"session:{session_id}")
return data if data else None
def destroy_session(session_id: str):
session = get_session(session_id)
if session:
r.srem(f"user_sessions:{session['user_id']}", session_id)
r.delete(f"session:{session_id}")
def destroy_all_user_sessions(user_id: str):
sessions = r.smembers(f"user_sessions:{user_id}")
for session_id in sessions:
r.delete(f"session:{session_id}")
r.delete(f"user_sessions:{user_id}")
|
Distributed Locking#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| import uuid
def acquire_lock(lock_name: str, timeout: int = 10) -> str | None:
lock_id = str(uuid.uuid4())
# SET NX with expiration
if r.set(f"lock:{lock_name}", lock_id, nx=True, ex=timeout):
return lock_id
return None
def release_lock(lock_name: str, lock_id: str) -> bool:
# Only release if we own the lock
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
return r.eval(script, 1, f"lock:{lock_name}", lock_id) == 1
# Usage
lock_id = acquire_lock("process_order")
if lock_id:
try:
process_order()
finally:
release_lock("process_order", lock_id)
|
Pub/Sub for Real-Time#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| # Publisher
def publish_event(channel: str, event: dict):
r.publish(channel, json.dumps(event))
# Subscriber
def listen_events(channel: str):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
handle_event(event)
# Usage
publish_event("orders", {"type": "created", "order_id": "123"})
|
Leaderboards with Sorted Sets#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| def add_score(user_id: str, score: int):
r.zadd("leaderboard", {user_id: score})
def get_rank(user_id: str) -> int:
# 0-indexed, reversed (highest first)
rank = r.zrevrank("leaderboard", user_id)
return rank + 1 if rank is not None else None
def get_top_players(count: int = 10) -> list:
# Returns [(user_id, score), ...]
return r.zrevrange("leaderboard", 0, count - 1, withscores=True)
def get_nearby_players(user_id: str, count: int = 5) -> list:
rank = r.zrevrank("leaderboard", user_id)
if rank is None:
return []
start = max(0, rank - count)
end = rank + count
return r.zrevrange("leaderboard", start, end, withscores=True)
|
Counting Unique Items (HyperLogLog)#
1
2
3
4
5
6
7
8
9
| def track_unique_visitor(page: str, visitor_id: str):
r.pfadd(f"visitors:{page}:{date.today()}", visitor_id)
def get_unique_visitors(page: str, day: date) -> int:
return r.pfcount(f"visitors:{page}:{day}")
def get_unique_visitors_range(page: str, start: date, end: date) -> int:
keys = [f"visitors:{page}:{d}" for d in date_range(start, end)]
return r.pfcount(*keys)
|
HyperLogLog uses ~12KB regardless of cardinality, with ~0.81% error rate.
Caching Best Practices#
Use Consistent Key Naming#
1
2
3
4
5
6
7
| # Pattern: {type}:{id}:{subtype}
"user:123"
"user:123:settings"
"user:123:sessions"
"product:456"
"product:456:reviews"
"cache:api:users:list:page:1"
|
Handle Cache Failures Gracefully#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| def get_cached_data(key: str, fetch_func):
try:
cached = r.get(key)
if cached:
return json.loads(cached)
except redis.RedisError:
# Log error, continue to database
pass
# Fetch from source
data = fetch_func()
try:
r.setex(key, 3600, json.dumps(data))
except redis.RedisError:
pass # Cache write failed, that's okay
return data
|
Prevent Cache Stampede#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| def get_with_lock(key: str, fetch_func, ttl: int = 3600):
cached = r.get(key)
if cached:
return json.loads(cached)
# Try to acquire lock
lock_key = f"lock:{key}"
if r.set(lock_key, "1", nx=True, ex=30):
try:
data = fetch_func()
r.setex(key, ttl, json.dumps(data))
return data
finally:
r.delete(lock_key)
else:
# Another process is fetching, wait and retry
time.sleep(0.1)
return get_with_lock(key, fetch_func, ttl)
|
Quick Reference#
| Pattern | Use Case | Data Structure |
|---|
| Simple cache | Key-value storage | STRING |
| Sessions | User session data | HASH |
| Rate limiting | API throttling | STRING/ZSET |
| Leaderboards | Rankings | ZSET |
| Unique counts | Analytics | HyperLogLog |
| Real-time events | Notifications | Pub/Sub |
| Distributed locks | Coordination | STRING with NX |
| Tag invalidation | Cache groups | SET |
Redis is fast because it’s in-memory, but its real power comes from choosing the right data structure for your problem. A sorted set for leaderboards, HyperLogLog for unique counts, pub/sub for real-time — these patterns let you solve problems that would be complex or slow with a traditional database.
📬 Get the Newsletter
Weekly insights on DevOps, automation, and CLI mastery. No spam, unsubscribe anytime.