fromcachetoolsimportTTLCachefromthreadingimportLockcache=TTLCache(maxsize=1000,ttl=300)# 5 minute TTLcache_lock=Lock()defget_user_profile(user_id:int)->dict:cache_key=f"user:{user_id}"withcache_lock:ifcache_keyincache:returncache[cache_key]# Cache miss - fetch from databaseprofile=db.query(f"SELECT * FROM users WHERE id = {user_id}")withcache_lock:cache[cache_key]=profilereturnprofile
importredisimportjsonr=redis.Redis(host='localhost',port=6379,db=0)defcache_get(key:str)->dict|None:"""Get value from cache."""data=r.get(key)returnjson.loads(data)ifdataelseNonedefcache_set(key:str,value:dict,ttl:int=300):"""Set value with expiration."""r.setex(key,ttl,json.dumps(value))defcache_delete(key:str):"""Invalidate cache entry."""r.delete(key)# Usageuser=cache_get("user:123")ifnotuser:user=fetch_user_from_db(123)cache_set("user:123",user,ttl=600)
defsave_user(user_id:int,data:dict):cache_key=f"user:{user_id}"# Write to both simultaneouslydb.save_user(user_id,data)cache_set(cache_key,data,ttl=3600)defget_user(user_id:int)->dict:cache_key=f"user:{user_id}"cached=cache_get(cache_key)ifcached:returncacheduser=db.get_user(user_id)cache_set(cache_key,user,ttl=3600)returnuser
Simplest approach — data expires after a set time.
1
2
3
4
5
# Short TTL for frequently changing datacache_set("stock_price:AAPL",price,ttl=60)# 1 minute# Long TTL for stable datacache_set("country_list",countries,ttl=86400)# 24 hours
Pros: Simple, automatic cleanup
Cons: Stale data until expiration
defupdate_user(user_id:int,data:dict):db.update_user(user_id,data)# Invalidate all related cachescache_delete(f"user:{user_id}")cache_delete(f"user:{user_id}:profile")cache_delete(f"user:{user_id}:permissions")# Publish event for other servicesredis.publish("user_updated",json.dumps({"user_id":user_id}))
defcache_set_with_tags(key:str,value:dict,tags:list[str],ttl:int=300):"""Set cache with tags for group invalidation."""r.setex(key,ttl,json.dumps(value))fortagintags:r.sadd(f"tag:{tag}",key)r.expire(f"tag:{tag}",ttl)definvalidate_by_tag(tag:str):"""Invalidate all cache entries with a tag."""keys=r.smembers(f"tag:{tag}")ifkeys:r.delete(*keys)r.delete(f"tag:{tag}")# Usagecache_set_with_tags("product:123",product_data,tags=["products","category:electronics"],ttl=3600)# Invalidate all productsinvalidate_by_tag("products")
importthreadinglocks={}defget_with_lock(key:str,fetch_func):cached=cache_get(key)ifcached:returncached# Get or create lock for this keyifkeynotinlocks:locks[key]=threading.Lock()withlocks[key]:# Double-check after acquiring lockcached=cache_get(key)ifcached:returncached# Only one thread fetchesvalue=fetch_func()cache_set(key,value)returnvalue
importthreadingdefget_with_background_refresh(key:str,fetch_func,ttl:int=300):cached=cache_get(key)remaining_ttl=r.ttl(key)ifcached:# Refresh in background if TTL < 20%ifremaining_ttl<ttl*0.2:threading.Thread(target=lambda:cache_set(key,fetch_func(),ttl)).start()returncachedvalue=fetch_func()cache_set(key,value,ttl)returnvalue