importredisimportjsonr=redis.Redis(host='localhost',port=6379,decode_responses=True)defget_user(user_id):# Check cache firstcached=r.get(f"user:{user_id}")ifcached:returnjson.loads(cached)# Miss: fetch from databaseuser=db.query("SELECT * FROM users WHERE id = %s",user_id)# Cache with TTLr.setex(f"user:{user_id}",3600,json.dumps(user))returnuser
importtimedefis_rate_limited(user_id,limit=100,window=60):key=f"ratelimit:{user_id}"now=time.time()window_start=now-windowpipe=r.pipeline()# Remove old entriespipe.zremrangebyscore(key,0,window_start)# Count requests in windowpipe.zcard(key)# Add current requestpipe.zadd(key,{str(now):now})# Set expirypipe.expire(key,window)results=pipe.execute()request_count=results[1]returnrequest_count>=limit
importuuiddefacquire_lock(resource,timeout=10):lock_id=str(uuid.uuid4())key=f"lock:{resource}"# SET NX with expiry (atomic)acquired=r.set(key,lock_id,nx=True,ex=timeout)returnlock_idifacquiredelseNonedefrelease_lock(resource,lock_id):key=f"lock:{resource}"# Only release if we own the lock (Lua for atomicity)script="""
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""r.eval(script,1,key,lock_id)# Usagelock=acquire_lock("user:123:update")iflock:try:# Do exclusive workupdate_user(123)finally:release_lock("user:123:update",lock)
defenqueue(queue_name,job):r.lpush(f"queue:{queue_name}",json.dumps(job))defdequeue(queue_name,timeout=0):# BRPOP blocks until item availableresult=r.brpop(f"queue:{queue_name}",timeout=timeout)ifresult:returnjson.loads(result[1])returnNone# Producerenqueue("emails",{"to":"user@example.com","template":"welcome"})# WorkerwhileTrue:job=dequeue("emails",timeout=5)ifjob:send_email(job)
With reliability (job won’t be lost if worker crashes):
1
2
3
4
5
6
7
8
9
10
11
defreliable_dequeue(queue_name,processing_queue):# Move from queue to processing atomicallyjob=r.brpoplpush(f"queue:{queue_name}",f"queue:{processing_queue}",timeout=5)returnjson.loads(job)ifjobelseNonedefcomplete_job(processing_queue,job):r.lrem(f"queue:{processing_queue}",1,json.dumps(job))
# Publisherdefpublish_event(channel,event):r.publish(channel,json.dumps(event))publish_event("user_updates",{"user_id":123,"action":"login"})# Subscriberdefsubscribe(channels):pubsub=r.pubsub()pubsub.subscribe(*channels)formessageinpubsub.listen():ifmessage["type"]=="message":data=json.loads(message["data"])handle_event(message["channel"],data)# Run subscriber in separate process/threadsubscribe(["user_updates","system_alerts"])
defadd_score(leaderboard,user_id,score):r.zadd(f"leaderboard:{leaderboard}",{user_id:score})defincrement_score(leaderboard,user_id,delta):r.zincrby(f"leaderboard:{leaderboard}",delta,user_id)defget_rank(leaderboard,user_id):# 0-indexed, reverse for highest-firstrank=r.zrevrank(f"leaderboard:{leaderboard}",user_id)returnrank+1ifrankisnotNoneelseNonedefget_top(leaderboard,count=10):returnr.zrevrange(f"leaderboard:{leaderboard}",0,count-1,withscores=True)# Usageadd_score("weekly","user:123",1500)increment_score("weekly","user:123",50)print(get_rank("weekly","user:123"))# 3print(get_top("weekly",10))# Top 10 with scores
deftrack_visitor(page,visitor_id):r.pfadd(f"visitors:{page}:{today()}",visitor_id)defunique_visitors(page,date):returnr.pfcount(f"visitors:{page}:{date}")# Track millions of visitors with ~12KB memory per countertrack_visitor("/home","user_abc")track_visitor("/home","user_xyz")track_visitor("/home","user_abc")# Duplicateprint(unique_visitors("/home","2024-01-15"))# ~2
defadd_event(stream,event):returnr.xadd(stream,event)defread_events(stream,last_id="0"):returnr.xread({stream:last_id},count=100,block=5000)defcreate_consumer_group(stream,group):try:r.xgroup_create(stream,group,id="0",mkstream=True)exceptredis.ResponseError:pass# Group existsdefconsume(stream,group,consumer):returnr.xreadgroup(group,consumer,{stream:">"},count=10,block=5000)defack(stream,group,message_id):r.xack(stream,group,message_id)# Usageadd_event("orders",{"user":"123","item":"widget","qty":"2"})create_consumer_group("orders","processors")events=consume("orders","processors","worker-1")forstream,messagesinevents:formsg_id,datainmessages:process_order(data)ack("orders","processors",msg_id)
Redis is a Swiss Army knife. Caching is just the blade everyone knows about. Rate limiting, sessions, locks, queues, pub/sub, leaderboards — they’re all built in, battle-tested, and fast.
Learn the data structures. Match them to your problems. Redis probably has a primitive that fits.
📬 Get the Newsletter
Weekly insights on DevOps, automation, and CLI mastery. No spam, unsubscribe anytime.