API Pagination Patterns: Offset, Cursor, and Keyset

Every API that returns lists needs pagination. Without it, a request for “all users” could return millions of rows, crushing your database and timing out the client. But pagination has tradeoffs—and choosing wrong can hurt performance or cause data inconsistencies. Offset Pagination The classic approach. Simple to implement, simple to understand: G G G E E E T T T / / / u u u s s s e e e r r r s s s ? ? ? l l l i i i m m m i i i t t t = = = 2 2 2 0 0 0 & & & o o o f f f f f f s s s e e e t t t = = = 0 2 4 0 0 # # # F S T i e h r c i s o r t n d d p p a p a g a g e g e e 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @app.get("/users") def list_users(limit: int = 20, offset: int = 0): users = db.query( "SELECT * FROM users ORDER BY id LIMIT %s OFFSET %s", (limit, offset) ) total = db.query("SELECT COUNT(*) FROM users")[0][0] return { "data": users, "pagination": { "limit": limit, "offset": offset, "total": total } } Pros: ...

March 1, 2026 Â· 7 min Â· 1286 words Â· Rob Washington

Database Connection Pooling: Stop Opening Connections for Every Query

Opening a database connection is expensive. TCP handshake, SSL negotiation, authentication, session setup—it all adds up. Do that for every query and your application crawls. Connection pooling fixes this by reusing connections. Here’s how to do it right. The Problem Without pooling, every request opens a new connection: 1 2 3 4 5 6 7 8 # BAD: New connection per request def get_user(user_id): conn = psycopg2.connect(DATABASE_URL) # ~50-100ms cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) user = cursor.fetchone() conn.close() return user At 100 requests per second, that’s 100 connections opening and closing per second. Your database server has a connection limit (typically 100-500). You’ll exhaust it fast. ...

March 1, 2026 Â· 5 min Â· 1021 words Â· Rob Washington

Caching Strategies: What to Cache and When to Invalidate

Cache invalidation is one of the two hard problems in computer science. Here’s how to make it less painful. The Caching Patterns Cache-Aside (Lazy Loading) 1 2 3 4 5 6 7 8 9 10 11 12 13 def get_user(user_id: str) -> dict: # Check cache first cached = redis.get(f"user:{user_id}") if cached: return json.loads(cached) # Cache miss: fetch from database user = db.query("SELECT * FROM users WHERE id = %s", user_id) # Store in cache for next time redis.setex(f"user:{user_id}", 3600, json.dumps(user)) return user Pros: Only caches what’s actually used Cons: First request always slow (cache miss) ...

February 28, 2026 Â· 5 min Â· 955 words Â· Rob Washington

Load Balancing: Beyond Round Robin

Round robin is the default, but it’s rarely the best choice. Here’s when to use each algorithm and why. The Algorithms Round Robin 1 2 3 4 5 upstream backend { server 192.168.1.1:8080; server 192.168.1.2:8080; server 192.168.1.3:8080; } Requests go 1→2→3→1→2→3. Simple, fair, ignores server load. Use when: All servers are identical and requests are uniform. Problem: A slow server gets the same traffic as a fast one. Weighted Round Robin 1 2 3 4 5 upstream backend { server 192.168.1.1:8080 weight=5; server 192.168.1.2:8080 weight=3; server 192.168.1.3:8080 weight=2; } Server 1 gets 50%, server 2 gets 30%, server 3 gets 20%. ...

February 28, 2026 Â· 4 min Â· 811 words Â· Rob Washington

Linux Performance Troubleshooting: The First Five Minutes

When a server is slow and people are yelling, you need a systematic approach. Here’s what to run in the first five minutes. The Checklist 1 2 3 4 5 6 7 8 uptime dmesg | tail vmstat 1 5 mpstat -P ALL 1 3 pidstat 1 3 iostat -xz 1 3 free -h sar -n DEV 1 3 Let’s break down what each tells you. 1. uptime 1 2 $ uptime 16:30:01 up 45 days, 3:22, 2 users, load average: 8.42, 6.31, 5.12 Load averages: 1-minute, 5-minute, 15-minute. ...

February 28, 2026 Â· 5 min Â· 1007 words Â· Rob Washington

Redis Patterns Beyond Simple Caching

Redis is often introduced as “a cache,” but that undersells it. Here are patterns that leverage Redis for rate limiting, sessions, queues, and real-time features. Pattern 1: Rate Limiting The sliding window approach: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import redis import time r = redis.Redis() def is_rate_limited(user_id: str, limit: int = 100, window: int = 60) -> bool: """Allow `limit` requests per `window` seconds.""" key = f"ratelimit:{user_id}" now = time.time() pipe = r.pipeline() pipe.zremrangebyscore(key, 0, now - window) # Remove old entries pipe.zadd(key, {str(now): now}) # Add current request pipe.zcard(key) # Count requests in window pipe.expire(key, window) # Auto-cleanup results = pipe.execute() request_count = results[2] return request_count > limit Using a sorted set with timestamps gives you a true sliding window, not just fixed buckets. ...

February 28, 2026 Â· 6 min Â· 1071 words Â· Rob Washington

Python Asyncio Patterns: Concurrency Without the Headaches

Asyncio enables concurrent I/O without threads. These patterns help you use it effectively without falling into common traps. Basic Structure 1 2 3 4 5 6 7 8 9 import asyncio async def main(): print("Hello") await asyncio.sleep(1) print("World") # Python 3.7+ asyncio.run(main()) HTTP Requests with aiohttp 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import aiohttp import asyncio async def fetch(session, url): async with session.get(url) as response: return await response.text() async def fetch_all(urls): async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] return await asyncio.gather(*tasks) # Usage urls = [ "https://api.example.com/users", "https://api.example.com/posts", "https://api.example.com/comments", ] results = asyncio.run(fetch_all(urls)) Task Management Running Tasks Concurrently 1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def task_a(): await asyncio.sleep(2) return "A done" async def task_b(): await asyncio.sleep(1) return "B done" async def main(): # Run concurrently, wait for all results = await asyncio.gather(task_a(), task_b()) print(results) # ['A done', 'B done'] - takes ~2s total, not 3s asyncio.run(main()) Handle Exceptions in gather 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 async def might_fail(n): if n == 2: raise ValueError("Task 2 failed") await asyncio.sleep(n) return f"Task {n} done" async def main(): # return_exceptions=True prevents one failure from canceling others results = await asyncio.gather( might_fail(1), might_fail(2), might_fail(3), return_exceptions=True ) for result in results: if isinstance(result, Exception): print(f"Error: {result}") else: print(result) asyncio.run(main()) First Completed 1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def main(): tasks = [ asyncio.create_task(fetch(session, url1)), asyncio.create_task(fetch(session, url2)), ] # Return when first completes done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) # Cancel remaining for task in pending: task.cancel() return done.pop().result() Timeout 1 2 3 4 5 6 7 8 9 10 11 async def slow_operation(): await asyncio.sleep(10) return "done" async def main(): try: result = await asyncio.wait_for(slow_operation(), timeout=5.0) except asyncio.TimeoutError: print("Operation timed out") asyncio.run(main()) Semaphores (Limiting Concurrency) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def fetch_with_limit(session, url, semaphore): async with semaphore: async with session.get(url) as response: return await response.text() async def main(): semaphore = asyncio.Semaphore(10) # Max 10 concurrent requests async with aiohttp.ClientSession() as session: tasks = [ fetch_with_limit(session, url, semaphore) for url in urls ] results = await asyncio.gather(*tasks) Queues for Producer/Consumer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 async def producer(queue, items): for item in items: await queue.put(item) print(f"Produced: {item}") # Signal completion await queue.put(None) async def consumer(queue, name): while True: item = await queue.get() if item is None: queue.task_done() break print(f"{name} processing: {item}") await asyncio.sleep(1) # Simulate work queue.task_done() async def main(): queue = asyncio.Queue(maxsize=10) # Start producer and multiple consumers await asyncio.gather( producer(queue, range(20)), consumer(queue, "Worker-1"), consumer(queue, "Worker-2"), ) asyncio.run(main()) Error Handling Patterns Task Exception Handling 1 2 3 4 5 6 7 8 9 10 11 12 13 async def risky_task(): await asyncio.sleep(1) raise ValueError("Something went wrong") async def main(): task = asyncio.create_task(risky_task()) try: await task except ValueError as e: print(f"Caught: {e}") asyncio.run(main()) Background Task Exceptions 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def handle_exception(loop, context): msg = context.get("exception", context["message"]) print(f"Caught exception: {msg}") async def background_task(): await asyncio.sleep(1) raise RuntimeError("Background failure") async def main(): loop = asyncio.get_event_loop() loop.set_exception_handler(handle_exception) # Fire and forget - exception won't crash main asyncio.create_task(background_task()) await asyncio.sleep(5) asyncio.run(main()) Context Managers 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncio from contextlib import asynccontextmanager @asynccontextmanager async def managed_resource(): print("Acquiring resource") resource = await create_resource() try: yield resource finally: print("Releasing resource") await resource.close() async def main(): async with managed_resource() as resource: await resource.do_something() Running Blocking Code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncio from concurrent.futures import ThreadPoolExecutor def blocking_io(): # Simulates blocking I/O import time time.sleep(2) return "Done" async def main(): loop = asyncio.get_event_loop() # Run in thread pool result = await loop.run_in_executor(None, blocking_io) print(result) # With custom executor with ThreadPoolExecutor(max_workers=4) as executor: result = await loop.run_in_executor(executor, blocking_io) asyncio.run(main()) Periodic Tasks 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 async def periodic_task(interval, func): while True: await func() await asyncio.sleep(interval) async def heartbeat(): print("Heartbeat") async def main(): # Start periodic task in background task = asyncio.create_task(periodic_task(5, heartbeat)) # Do other work await asyncio.sleep(20) # Cancel when done task.cancel() try: await task except asyncio.CancelledError: print("Periodic task cancelled") asyncio.run(main()) Graceful Shutdown 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import signal async def shutdown(signal, loop): print(f"Received {signal.name}") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) loop.stop() async def main(): loop = asyncio.get_event_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler( sig, lambda s=sig: asyncio.create_task(shutdown(s, loop)) ) # Your long-running tasks here await asyncio.sleep(3600) asyncio.run(main()) Common Pitfalls Don’t Block the Event Loop 1 2 3 4 5 6 7 8 9 # BAD - blocks entire event loop async def bad(): time.sleep(5) # Blocking! return "done" # GOOD - use async sleep or run_in_executor async def good(): await asyncio.sleep(5) return "done" Don’t Forget to Await 1 2 3 4 5 6 7 # BAD - coroutine never runs async def main(): fetch_data() # Missing await! # GOOD async def main(): await fetch_data() Create Tasks Properly 1 2 3 4 5 6 7 8 9 # BAD - task may be garbage collected async def main(): asyncio.create_task(background_work()) # Task might not complete # GOOD - keep reference async def main(): task = asyncio.create_task(background_work()) await task # or store in set Don’t Mix Sync and Async 1 2 3 4 5 6 7 # BAD - calling async from sync incorrectly def sync_function(): result = async_function() # Returns coroutine, not result # GOOD - use asyncio.run or run_in_executor def sync_function(): result = asyncio.run(async_function()) Testing Async Code 1 2 3 4 5 6 7 8 9 10 11 12 13 import pytest import asyncio @pytest.mark.asyncio async def test_async_function(): result = await my_async_function() assert result == expected # Or with unittest class TestAsync(unittest.IsolatedAsyncioTestCase): async def test_something(self): result = await my_async_function() self.assertEqual(result, expected) Quick Reference 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 # Run async function asyncio.run(main()) # Concurrent execution await asyncio.gather(task1(), task2()) # Create background task task = asyncio.create_task(coro()) # Timeout await asyncio.wait_for(coro(), timeout=5.0) # Limit concurrency semaphore = asyncio.Semaphore(10) async with semaphore: ... # Run blocking code await loop.run_in_executor(None, blocking_func) # Sleep await asyncio.sleep(1) Asyncio shines for I/O-bound workloads—HTTP requests, database queries, file operations. It won’t help with CPU-bound work (use multiprocessing for that). ...

February 25, 2026 Â· 6 min Â· 1194 words Â· Rob Washington

Redis Patterns: Beyond Simple Key-Value Caching

Redis is often introduced as “just a cache,” but it’s a versatile data structure server. These patterns unlock its full potential. Connection Basics 1 2 3 4 5 6 7 8 9 10 11 # Connect redis-cli -h localhost -p 6379 # With password redis-cli -h localhost -p 6379 -a yourpassword # Select database (0-15) SELECT 1 # Check connectivity PING Caching Patterns Basic Cache with TTL 1 2 3 4 5 6 7 8 9 10 11 # Set with expiration (seconds) SET user:123:profile '{"name":"Alice"}' EX 3600 # Set with expiration (milliseconds) SET session:abc123 '{"user_id":123}' PX 86400000 # Set only if not exists SETNX cache:key "value" # Set only if exists (update) SET cache:key "newvalue" XX Cache-Aside Pattern 1 2 3 4 5 6 7 8 9 10 11 12 def get_user(user_id): # Check cache first cached = redis.get(f"user:{user_id}") if cached: return json.loads(cached) # Cache miss - fetch from database user = db.query("SELECT * FROM users WHERE id = %s", user_id) # Store in cache redis.setex(f"user:{user_id}", 3600, json.dumps(user)) return user Write-Through Pattern 1 2 3 4 5 6 def update_user(user_id, data): # Update database db.execute("UPDATE users SET ... WHERE id = %s", user_id) # Update cache immediately redis.setex(f"user:{user_id}", 3600, json.dumps(data)) Cache Stampede Prevention 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def get_with_lock(key, fetch_func, ttl=3600, lock_ttl=10): value = redis.get(key) if value: return json.loads(value) lock_key = f"lock:{key}" # Try to acquire lock if redis.set(lock_key, "1", nx=True, ex=lock_ttl): try: value = fetch_func() redis.setex(key, ttl, json.dumps(value)) return value finally: redis.delete(lock_key) else: # Another process is fetching, wait and retry time.sleep(0.1) return get_with_lock(key, fetch_func, ttl, lock_ttl) Session Storage 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import secrets def create_session(user_id, ttl=86400): session_id = secrets.token_urlsafe(32) session_data = { "user_id": user_id, "created_at": time.time() } redis.setex(f"session:{session_id}", ttl, json.dumps(session_data)) return session_id def get_session(session_id): data = redis.get(f"session:{session_id}") return json.loads(data) if data else None def extend_session(session_id, ttl=86400): redis.expire(f"session:{session_id}", ttl) def destroy_session(session_id): redis.delete(f"session:{session_id}") Rate Limiting Fixed Window 1 2 3 4 5 6 7 8 def is_rate_limited(user_id, limit=100, window=60): key = f"ratelimit:{user_id}:{int(time.time() // window)}" current = redis.incr(key) if current == 1: redis.expire(key, window) return current > limit Sliding Window with Sorted Sets 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def is_rate_limited_sliding(user_id, limit=100, window=60): key = f"ratelimit:{user_id}" now = time.time() window_start = now - window pipe = redis.pipeline() # Remove old entries pipe.zremrangebyscore(key, 0, window_start) # Add current request pipe.zadd(key, {str(now): now}) # Count requests in window pipe.zcard(key) # Set expiration pipe.expire(key, window) results = pipe.execute() request_count = results[2] return request_count > limit Token Bucket 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def check_token_bucket(user_id, capacity=10, refill_rate=1): key = f"bucket:{user_id}" now = time.time() # Get current state data = redis.hgetall(key) if data: tokens = float(data[b'tokens']) last_update = float(data[b'last_update']) # Refill tokens based on elapsed time elapsed = now - last_update tokens = min(capacity, tokens + elapsed * refill_rate) else: tokens = capacity if tokens >= 1: # Consume a token redis.hset(key, mapping={ 'tokens': tokens - 1, 'last_update': now }) redis.expire(key, int(capacity / refill_rate) + 1) return True return False Queues and Pub/Sub Simple Queue with Lists 1 2 3 4 5 6 7 8 9 10 # Producer def enqueue(queue_name, message): redis.lpush(queue_name, json.dumps(message)) # Consumer (blocking) def dequeue(queue_name, timeout=0): result = redis.brpop(queue_name, timeout) if result: return json.loads(result[1]) return None Reliable Queue with RPOPLPUSH 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def reliable_dequeue(queue_name, processing_queue): # Move item to processing queue atomically item = redis.rpoplpush(queue_name, processing_queue) return json.loads(item) if item else None def ack(processing_queue, item): # Remove from processing queue when done redis.lrem(processing_queue, 1, json.dumps(item)) def requeue_failed(processing_queue, queue_name): # Move failed items back to main queue while True: item = redis.rpoplpush(processing_queue, queue_name) if not item: break Pub/Sub 1 2 3 4 5 6 7 8 9 10 11 12 # Publisher def publish_event(channel, event): redis.publish(channel, json.dumps(event)) # Subscriber def subscribe(channel, callback): pubsub = redis.pubsub() pubsub.subscribe(channel) for message in pubsub.listen(): if message['type'] == 'message': callback(json.loads(message['data'])) Leaderboards with Sorted Sets 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def add_score(leaderboard, user_id, score): redis.zadd(leaderboard, {user_id: score}) def increment_score(leaderboard, user_id, amount): redis.zincrby(leaderboard, amount, user_id) def get_rank(leaderboard, user_id): # 0-indexed, reverse order (highest first) rank = redis.zrevrank(leaderboard, user_id) return rank + 1 if rank is not None else None def get_top(leaderboard, count=10): return redis.zrevrange(leaderboard, 0, count - 1, withscores=True) def get_around_user(leaderboard, user_id, count=5): rank = redis.zrevrank(leaderboard, user_id) if rank is None: return [] start = max(0, rank - count) end = rank + count return redis.zrevrange(leaderboard, start, end, withscores=True) Distributed Locks 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import uuid class RedisLock: def __init__(self, redis_client, key, ttl=10): self.redis = redis_client self.key = f"lock:{key}" self.ttl = ttl self.token = str(uuid.uuid4()) def acquire(self, blocking=True, timeout=None): start = time.time() while True: if self.redis.set(self.key, self.token, nx=True, ex=self.ttl): return True if not blocking: return False if timeout and (time.time() - start) > timeout: return False time.sleep(0.1) def release(self): # Only release if we own the lock script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ self.redis.eval(script, 1, self.key, self.token) def __enter__(self): self.acquire() return self def __exit__(self, *args): self.release() # Usage with RedisLock(redis, "my-resource"): # Critical section do_work() Counting and Analytics HyperLogLog for Unique Counts 1 2 3 4 5 6 7 8 9 10 11 # Count unique visitors (memory efficient) def track_visitor(page, visitor_id): redis.pfadd(f"visitors:{page}:{date.today()}", visitor_id) def get_unique_visitors(page, date): return redis.pfcount(f"visitors:{page}:{date}") # Merge multiple days def get_weekly_uniques(page): keys = [f"visitors:{page}:{date}" for date in last_7_days()] return redis.pfcount(*keys) Bitmaps for Daily Active Users 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def mark_active(user_id, date=None): date = date or date.today().isoformat() redis.setbit(f"active:{date}", user_id, 1) def was_active(user_id, date): return redis.getbit(f"active:{date}", user_id) == 1 def count_active(date): return redis.bitcount(f"active:{date}") # Users active on multiple days def active_all_days(dates): keys = [f"active:{d}" for d in dates] result_key = "temp:active_intersection" redis.bitop("AND", result_key, *keys) count = redis.bitcount(result_key) redis.delete(result_key) return count Expiration Strategies 1 2 3 4 5 6 7 8 9 10 11 12 # Set TTL EXPIRE key 3600 EXPIREAT key 1735689600 # Unix timestamp # Check TTL TTL key # Returns -1 if no expiry, -2 if doesn't exist # Remove expiration PERSIST key # Set value and TTL atomically SETEX key 3600 "value" Lazy Expiration Pattern 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def get_with_soft_expire(key, ttl=3600, soft_ttl=300): """ Returns cached value but triggers background refresh if within soft_ttl of expiration. """ pipe = redis.pipeline() pipe.get(key) pipe.ttl(key) value, remaining_ttl = pipe.execute() if value and remaining_ttl < soft_ttl: # Trigger async refresh refresh_cache_async.delay(key) return value Transactions and Lua Scripts Pipeline (Batching) 1 2 3 4 pipe = redis.pipeline() for i in range(1000): pipe.set(f"key:{i}", f"value:{i}") pipe.execute() # Single round trip Transaction with WATCH 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def transfer(from_account, to_account, amount): with redis.pipeline() as pipe: while True: try: # Watch for changes pipe.watch(from_account, to_account) from_balance = int(pipe.get(from_account) or 0) if from_balance < amount: pipe.unwatch() return False # Start transaction pipe.multi() pipe.decrby(from_account, amount) pipe.incrby(to_account, amount) pipe.execute() return True except redis.WatchError: # Retry if watched keys changed continue Lua Script (Atomic Operations) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # Rate limiter as Lua script RATE_LIMIT_SCRIPT = """ local key = KEYS[1] local limit = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local current = redis.call('INCR', key) if current == 1 then redis.call('EXPIRE', key, window) end if current > limit then return 0 else return 1 end """ rate_limit = redis.register_script(RATE_LIMIT_SCRIPT) def check_rate_limit(user_id, limit=100, window=60): key = f"ratelimit:{user_id}:{int(time.time() // window)}" return rate_limit(keys=[key], args=[limit, window]) == 1 Monitoring 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # Real-time commands MONITOR # Stats INFO INFO memory INFO stats # Slow queries SLOWLOG GET 10 # Connected clients CLIENT LIST # Memory usage for a key MEMORY USAGE mykey Redis excels when you match the right data structure to your problem. Lists for queues, sorted sets for leaderboards, HyperLogLog for counting uniques—each has its sweet spot. ...

February 25, 2026 Â· 7 min Â· 1467 words Â· Rob Washington

PostgreSQL Performance Tuning: From Slow Queries to Snappy Responses

PostgreSQL is fast out of the box. But “fast enough for development” and “fast enough for production” are different conversations. These techniques will help you find and fix performance bottlenecks. Finding Slow Queries Enable Query Logging 1 2 3 4 5 6 -- Log queries slower than 500ms ALTER SYSTEM SET log_min_duration_statement = '500ms'; SELECT pg_reload_conf(); -- Check current setting SHOW log_min_duration_statement; pg_stat_statements Extension The most valuable performance tool: ...

February 25, 2026 Â· 9 min Â· 1770 words Â· Rob Washington

Redis Caching Patterns: Beyond Simple Key-Value

Redis is often introduced as “a cache” but it’s really a data structure server. Understanding its primitives unlocks patterns far beyond simple key-value storage. Basic Caching The fundamental pattern: cache expensive operations. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import redis import json r = redis.Redis(host='localhost', port=6379, decode_responses=True) def get_user(user_id: str) -> dict: # Check cache first cached = r.get(f"user:{user_id}") if cached: return json.loads(cached) # Cache miss - fetch from database user = db.query_user(user_id) # Store in cache with 1 hour TTL r.setex(f"user:{user_id}", 3600, json.dumps(user)) return user Cache-Aside Pattern The application manages the cache explicitly: ...

February 24, 2026 Â· 7 min Â· 1302 words Â· Rob Washington