Rate limiting is the immune system of your API. Without it, a single misbehaving client can take down your service for everyone. With poorly designed limits, you’ll frustrate legitimate users while sophisticated attackers route around you.
The goal isn’t just protection—it’s fairness. Every user gets a reasonable share of your capacity.
The Basic Algorithms#
Fixed Window#
The simplest approach: count requests per time window, reject when over limit.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| import time
import redis
def is_rate_limited(user_id: str, limit: int = 100, window: int = 60) -> bool:
"""Fixed window: 100 requests per minute."""
r = redis.Redis()
# Window key based on current minute
window_key = f"ratelimit:{user_id}:{int(time.time() // window)}"
current = r.incr(window_key)
if current == 1:
r.expire(window_key, window)
return current > limit
|
Problem: Burst at window boundaries. A user can make 100 requests at 0:59 and 100 more at 1:00—200 requests in 2 seconds while technically staying under “100/minute.”
Sliding Window Log#
Track exact timestamps of all requests:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| def is_rate_limited_sliding(user_id: str, limit: int = 100, window: int = 60) -> bool:
"""Sliding window log: 100 requests in any 60-second period."""
r = redis.Redis()
key = f"ratelimit:{user_id}"
now = time.time()
# Remove old entries
r.zremrangebyscore(key, 0, now - window)
# Count current entries
count = r.zcard(key)
if count >= limit:
return True
# Add current request
r.zadd(key, {str(now): now})
r.expire(key, window)
return False
|
Accurate but expensive: O(n) memory per user, cleanup overhead. Fine for low-volume APIs, problematic at scale.
Sliding Window Counter#
Hybrid approach—approximate sliding window using two fixed windows:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| def is_rate_limited_sliding_counter(user_id: str, limit: int = 100, window: int = 60) -> bool:
"""Sliding window counter: approximates sliding window with O(1) space."""
r = redis.Redis()
now = time.time()
current_window = int(now // window)
previous_window = current_window - 1
window_progress = (now % window) / window # 0.0 to 1.0
current_key = f"ratelimit:{user_id}:{current_window}"
previous_key = f"ratelimit:{user_id}:{previous_window}"
current_count = int(r.get(current_key) or 0)
previous_count = int(r.get(previous_key) or 0)
# Weight previous window by remaining time
weighted_count = current_count + (previous_count * (1 - window_progress))
if weighted_count >= limit:
return True
r.incr(current_key)
r.expire(current_key, window * 2)
return False
|
Best of both worlds: O(1) space, smooth rate limiting, good enough accuracy for most use cases.
Token Bucket#
Allow bursts while maintaining average rate:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
"""
capacity: Maximum tokens (burst size)
refill_rate: Tokens added per second
"""
self.capacity = capacity
self.refill_rate = refill_rate
def is_allowed(self, user_id: str, tokens: int = 1) -> bool:
r = redis.Redis()
key = f"bucket:{user_id}"
now = time.time()
# Get current state
data = r.hgetall(key)
if not data:
# Initialize bucket
current_tokens = self.capacity
last_refill = now
else:
current_tokens = float(data[b'tokens'])
last_refill = float(data[b'last_refill'])
# Add tokens based on elapsed time
elapsed = now - last_refill
current_tokens = min(
self.capacity,
current_tokens + (elapsed * self.refill_rate)
)
if current_tokens >= tokens:
# Consume tokens
r.hset(key, mapping={
'tokens': current_tokens - tokens,
'last_refill': now
})
r.expire(key, int(self.capacity / self.refill_rate) + 60)
return True
return False
# Example: 100 tokens capacity, refills at 10/second
# Allows burst of 100, then sustained 10/second
bucket = TokenBucket(capacity=100, refill_rate=10)
|
Perfect for: APIs where occasional bursts are acceptable but average rate must be controlled.
Multi-Level Limits#
Real APIs need multiple limit tiers:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| RATE_LIMITS = {
'per_second': {'limit': 10, 'window': 1},
'per_minute': {'limit': 100, 'window': 60},
'per_hour': {'limit': 1000, 'window': 3600},
'per_day': {'limit': 10000, 'window': 86400},
}
def check_all_limits(user_id: str) -> tuple[bool, str | None]:
"""Check all rate limit tiers. Returns (allowed, violated_tier)."""
for tier_name, config in RATE_LIMITS.items():
if is_rate_limited_sliding_counter(
f"{user_id}:{tier_name}",
config['limit'],
config['window']
):
return False, tier_name
return True, None
|
This prevents both burst abuse (per-second) and sustained abuse (per-day) while allowing reasonable usage patterns.
Differentiated Limits#
Not all users deserve the same limits:
1
2
3
4
5
6
7
8
9
10
| TIER_LIMITS = {
'free': {'requests_per_hour': 100, 'requests_per_day': 1000},
'basic': {'requests_per_hour': 1000, 'requests_per_day': 10000},
'pro': {'requests_per_hour': 10000, 'requests_per_day': 100000},
'enterprise': {'requests_per_hour': 100000, 'requests_per_day': None}, # Unlimited daily
}
async def get_rate_limit_config(user_id: str) -> dict:
user = await get_user(user_id)
return TIER_LIMITS.get(user.tier, TIER_LIMITS['free'])
|
Cost-Based Limits#
Some endpoints cost more than others:
1
2
3
4
5
6
7
8
9
10
| ENDPOINT_COSTS = {
'/api/v1/search': 10, # Expensive database query
'/api/v1/export': 100, # Heavy processing
'/api/v1/users/{id}': 1, # Simple lookup
'default': 1,
}
def check_rate_limit_with_cost(user_id: str, endpoint: str) -> bool:
cost = ENDPOINT_COSTS.get(endpoint, ENDPOINT_COSTS['default'])
return bucket.is_allowed(user_id, tokens=cost)
|
A user with 100 tokens/minute can make 100 simple lookups OR 10 searches OR 1 export.
Always tell clients their limit status:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| from fastapi import Request, Response
async def rate_limit_middleware(request: Request, call_next):
user_id = get_user_id(request)
config = await get_rate_limit_config(user_id)
allowed, remaining, reset_at = check_limit_with_info(user_id, config)
if not allowed:
return Response(
content='{"error": "Rate limit exceeded"}',
status_code=429,
headers={
'X-RateLimit-Limit': str(config['requests_per_hour']),
'X-RateLimit-Remaining': '0',
'X-RateLimit-Reset': str(reset_at),
'Retry-After': str(reset_at - int(time.time())),
}
)
response = await call_next(request)
response.headers['X-RateLimit-Limit'] = str(config['requests_per_hour'])
response.headers['X-RateLimit-Remaining'] = str(remaining)
response.headers['X-RateLimit-Reset'] = str(reset_at)
return response
|
Good clients will respect Retry-After. Bad clients will hammer anyway—but at least you’ve given them the information.
Graceful Degradation#
Don’t just reject—degrade gracefully:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| async def search_endpoint(query: str, user_id: str):
limits = await get_rate_limit_config(user_id)
remaining = get_remaining_tokens(user_id)
if remaining <= 0:
raise HTTPException(429, "Rate limit exceeded")
if remaining < 10:
# Low on quota: return cached/simplified results
return await get_cached_search_results(query)
if remaining < 50:
# Getting low: limit result count
return await search(query, max_results=10)
# Full quota: full results
return await search(query, max_results=100)
|
Users get degraded service instead of errors. Better UX, same protection.
Distributed Rate Limiting#
Single Redis works until it doesn’t. Options:
Redis Cluster#
1
2
3
4
| from redis.cluster import RedisCluster
rc = RedisCluster(host='redis-cluster', port=6379)
# Same operations, automatic sharding by key
|
Local + Global Limits#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| import threading
class HybridRateLimiter:
def __init__(self, local_limit: int, global_limit: int):
self.local_limit = local_limit
self.global_limit = global_limit
self.local_counts = {}
self.lock = threading.Lock()
def is_allowed(self, user_id: str) -> bool:
# Fast local check (no network)
with self.lock:
local_count = self.local_counts.get(user_id, 0)
if local_count >= self.local_limit:
# Check global (network call)
if self.check_global(user_id):
return False
self.local_counts[user_id] = local_count + 1
return True
|
Local limits catch obvious abuse without network calls. Global limits ensure consistency across instances.
The Checklist#
Before shipping rate limiting:
Rate limiting is a feature, not just protection. Done well, it makes your API predictable, fair, and sustainable.